From c3692553c206042efc2c1f62aae5121cb23a4f0a Mon Sep 17 00:00:00 2001 From: Etienne Trimaille Date: Tue, 29 Dec 2015 14:05:57 +0100 Subject: [PATCH 1/2] refactoring osmupdate --- docker-osmupdate/download.py | 204 +++++++++++++++++++---------------- 1 file changed, 113 insertions(+), 91 deletions(-) diff --git a/docker-osmupdate/download.py b/docker-osmupdate/download.py index 3f5e995..3b31ef9 100644 --- a/docker-osmupdate/download.py +++ b/docker-osmupdate/download.py @@ -27,113 +27,135 @@ from datetime import datetime from time import sleep from sys import stderr -# Default values which can be overwritten. -default = { - 'MAX_DAYS': '100', - 'DIFF': 'sporadic', - 'MAX_MERGE': '7', - 'COMPRESSION_LEVEL': '1', - 'BASE_URL': 'http://planet.openstreetmap.org/replication/', - 'IMPORT_QUEUE': 'import_queue', - 'IMPORT_DONE': 'import_done', - 'SETTINGS': 'settings', - 'TIME': 120, -} -for key in environ.keys(): - if key in default.keys(): - default[key] = environ[key] +class Downloader(object): -# Folders -folders = ['IMPORT_QUEUE', 'IMPORT_DONE', 'SETTINGS'] -for folder in folders: - if not isabs(default[folder]): - # Get the absolute path. - default[folder] = abspath(default[folder]) + def __init__(self): + # Default values which can be overwritten. + self.default = { + 'MAX_DAYS': '100', + 'DIFF': 'sporadic', + 'MAX_MERGE': '7', + 'COMPRESSION_LEVEL': '1', + 'BASE_URL': 'http://planet.openstreetmap.org/replication/', + 'IMPORT_QUEUE': 'import_queue', + 'IMPORT_DONE': 'import_done', + 'SETTINGS': 'settings', + 'TIME': 120, + } + self.osm_file = None - # Test the folder - if not exists(default[folder]): - print >> stderr, 'The folder %s does not exist.' % default[folder] + @staticmethod + def info(message): + print message + + @staticmethod + def error(message): + print >> stderr, message exit() -# Test files -osm_file = None -for f in listdir(default['SETTINGS']): + def overwrite_environment(self): + """Overwrite default values from the environment.""" + for key in environ.keys(): + if key in self.default.keys(): + self.default[key] = environ[key] - if f.endswith('.pbf'): - osm_file = join(default['SETTINGS'], f) + def check_settings(self): + """Perform various checking.""" + # Folders + folders = ['IMPORT_QUEUE', 'IMPORT_DONE', 'SETTINGS'] + for folder in folders: + if not isabs(self.default[folder]): + # Get the absolute path. + self.default[folder] = abspath(self.default[folder]) - """ - # Todo : need fix custom URL and sporadic diff : daily, hourly and minutely - if f == 'custom_url_diff.txt': - with open(join(default['SETTINGS'], f), 'r') as content_file: - default['BASE_URL'] = content_file.read() - """ + # Test the folder + if not exists(self.default[folder]): + msg = 'The folder %s does not exist.' % self.default[folder] + self.error(msg) -if not osm_file: - print >> stderr, 'OSM file *.osm.pbf is missing in %s' % default['SETTINGS'] - exit() + # Test files + for f in listdir(self.default['SETTINGS']): -# In docker-compose, we should wait for the DB is ready. -print 'The checkup is OK. The container will continue soon, after the database.' -sleep(45) + if f.endswith('.pbf'): + self.osm_file = join(self.default['SETTINGS'], f) -# Finally launch the listening process. -while True: - # Check if diff to be imported is empty. If not, take the latest diff. - diff_to_be_imported = sorted(listdir(default['IMPORT_QUEUE'])) - if len(diff_to_be_imported): - file_name = diff_to_be_imported[-1].split('.')[0] - timestamp = file_name.split('->-')[1] - print 'Timestamp from the latest not imported diff : %s' % timestamp - else: - # Check if imported diff is empty. If not, take the latest diff. - imported_diff = sorted(listdir(default['IMPORT_DONE'])) - if len(imported_diff): - file_name = imported_diff[-1].split('.')[0] + if not self.osm_file: + msg = 'OSM file *.osm.pbf is missing in %s' % self.default['SETTINGS'] + self.error(msg) + + self.info('The checkup is OK. The container will continue soon, after the database.') + sleep(45) + + def _check_latest_timestamp(self): + """Fetch the latest timestamp.""" + # Check if diff to be imported is empty. If not, take the latest diff. + diff_to_be_imported = sorted(listdir(self.default['IMPORT_QUEUE'])) + if len(diff_to_be_imported): + file_name = diff_to_be_imported[-1].split('.')[0] timestamp = file_name.split('->-')[1] - print 'Timestamp from the latest imported diff : %s' % timestamp - + self.info('Timestamp from the latest not imported diff : %s' % timestamp) else: - # Take the timestamp from original file. - command = ['osmconvert', osm_file, '--out-timestamp'] - processus = Popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE) - timestamp, err = processus.communicate() + # Check if imported diff is empty. If not, take the latest diff. + imported_diff = sorted(listdir(self.default['IMPORT_DONE'])) + if len(imported_diff): + file_name = imported_diff[-1].split('.')[0] + timestamp = file_name.split('->-')[1] + self.info('Timestamp from the latest imported diff : %s' % timestamp) - # Remove new line - timestamp = timestamp.strip() + else: + # Take the timestamp from original file. + command = ['osmconvert', self.osm_file, '--out-timestamp'] + processus = Popen( + command, stdin=PIPE, stdout=PIPE, stderr=PIPE) + timestamp, err = processus.communicate() - print 'Timestamp from the original state file : %s' % timestamp + # Remove new line + timestamp = timestamp.strip() - # Removing some \ in the timestamp. - timestamp = timestamp.replace('\\', '') + self.info('Timestamp from the original state file : %s' % timestamp) - # Save time - current_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ') - print 'Old time : %s' % timestamp - print 'Current time : %s' % current_time + # Removing some \ in the timestamp. + timestamp = timestamp.replace('\\', '') + return timestamp - # Destination - file_name = '%s->-%s.osc.gz' % (timestamp, current_time) - file_path = join(default['IMPORT_QUEUE'], file_name) + def download(self): + """Infinite loop to download diff files on a regular interval.""" + while True: + timestamp = self._check_latest_timestamp() - # Command - command = ['osmupdate', '-v'] - command += ['--max-days=' + default['MAX_DAYS']] - command += [default['DIFF']] - command += ['--max-merge=' + default['MAX_MERGE']] - command += ['--compression-level=' + default['COMPRESSION_LEVEL']] - command += ['--base-url=' + default['BASE_URL']] - command.append(timestamp) - command.append(file_path) + # Save time + current_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ') + self.info('Old time : %s' % timestamp) + self.info('Current time : %s' % current_time) - print ' '.join(command) - if call(command) != 0: - print >> stderr, 'An error occured in osmupdate. Let\'s try again.' - # Sleep less. - print 'Sleeping for 2 seconds.' - sleep(2.0) - else: - # Everything was fine, let's sleeping. - print 'Sleeping for %s seconds.' % default['TIME'] - sleep(float(default['TIME'])) + # Destination + file_name = '%s->-%s.osc.gz' % (timestamp, current_time) + file_path = join(self.default['IMPORT_QUEUE'], file_name) + + # Command + command = ['osmupdate', '-v'] + command += ['--max-days=' + self.default['MAX_DAYS']] + command += [self.default['DIFF']] + command += ['--max-merge=' + self.default['MAX_MERGE']] + command += ['--compression-level=' + self.default['COMPRESSION_LEVEL']] + command += ['--base-url=' + self.default['BASE_URL']] + command.append(timestamp) + command.append(file_path) + + self.info(' '.join(command)) + if call(command) != 0: + self.info('An error occured in osmupdate. Let\'s try again.') + # Sleep less. + self.info('Sleeping for 2 seconds.') + sleep(2.0) + else: + # Everything was fine, let's sleeping. + self.info('Sleeping for %s seconds.' % self.default['TIME']) + sleep(float(self.default['TIME'])) + +if __name__ == '__main__': + downloader = Downloader() + downloader.overwrite_environment() + downloader.check_settings() + downloader.download() From eb67fae10fb283e478a73f93a7be45bcad914906 Mon Sep 17 00:00:00 2001 From: Etienne Trimaille Date: Tue, 29 Dec 2015 14:06:10 +0100 Subject: [PATCH 2/2] refactoring imposm --- docker-imposm3/importer.py | 391 +++++++++++++++++++++---------------- 1 file changed, 218 insertions(+), 173 deletions(-) diff --git a/docker-imposm3/importer.py b/docker-imposm3/importer.py index ba3ffb8..8aa5582 100644 --- a/docker-imposm3/importer.py +++ b/docker-imposm3/importer.py @@ -28,211 +28,256 @@ from subprocess import call from time import sleep from sys import stderr -# All these default values can be overwritten by env vars -default = { - 'TIME': 120, - 'USER': 'docker', - 'PASSWORD': 'docker', - 'DATABASE': 'gis', - 'HOST': 'db', - 'PORT': '5432', - 'SETTINGS': 'settings', - 'CACHE': 'cache', - 'IMPORT_DONE': 'import_done', - 'IMPORT_QUEUE': 'import_queue', - 'SRID': '4326', - 'OPTIMIZE': 'false', - 'DBSCHEMA_PRODUCTION': 'public', - 'DBSCHEMA_IMPORT': 'import', - 'DBSCHEMA_BACKUP': 'backup', - 'QGIS_STYLE': 'yes' -} -# Check if we overwrite default values. -for key in environ.keys(): - if key in default.keys(): - default[key] = environ[key] +class Importer(object): -# Check valid SRID. -if default['SRID'] not in ['4326', '3857']: - print >> stderr, 'SRID not supported : %s' % default['SRID'] - exit() + def __init__(self): + # Default values which can be overwritten. + self.default = { + 'TIME': 120, + 'USER': 'docker', + 'PASSWORD': 'docker', + 'DATABASE': 'gis', + 'HOST': 'db', + 'PORT': '5432', + 'SETTINGS': 'settings', + 'CACHE': 'cache', + 'IMPORT_DONE': 'import_done', + 'IMPORT_QUEUE': 'import_queue', + 'SRID': '4326', + 'OPTIMIZE': 'false', + 'DBSCHEMA_PRODUCTION': 'public', + 'DBSCHEMA_IMPORT': 'import', + 'DBSCHEMA_BACKUP': 'backup', + 'QGIS_STYLE': 'yes' + } + self.osm_file = None + self.mapping_file = None + self.post_import_file = None + self.qgis_style = None -# Check valid QGIS_STYLE. -if default['QGIS_STYLE'] not in ['yes', 'no']: - print >> stderr, 'QGIS_STYLE not supported : %s' % default['QGIS_STYLE'] - exit() + self.cursor = None + self.postgis_uri = None -# Check folders. -folders = ['IMPORT_QUEUE', 'IMPORT_DONE', 'SETTINGS', 'CACHE'] -for folder in folders: - if not isabs(default[folder]): - # Get the absolute path. - default[folder] = abspath(default[folder]) + @staticmethod + def info(message): + print message - # Test the folder - if not exists(default[folder]): - print >> stderr, 'The folder %s does not exist.' % default[folder] + @staticmethod + def error(message): + print >> stderr, message exit() -# Test files -osm_file = None -mapping_file = None -post_import_file = None -qgis_style = None -for f in listdir(default['SETTINGS']): + def overwrite_environment(self): + """Overwrite default values from the environment.""" + for key in environ.keys(): + if key in self.default.keys(): + self.default[key] = environ[key] - if f.endswith('.pbf'): - osm_file = join(default['SETTINGS'], f) + def check_settings(self): + """Perform various checking.""" - if f.endswith('.json'): - mapping_file = join(default['SETTINGS'], f) + # Check valid SRID. + if self.default['SRID'] not in ['4326', '3857']: + msg = 'SRID not supported : %s' % self.default['SRID'] + self.error(msg) - if f == 'post-pbf-import.sql': - post_import_file = join(default['SETTINGS'], f) + # Check valid QGIS_STYLE. + if self.default['QGIS_STYLE'] not in ['yes', 'no']: + msg = 'QGIS_STYLE not supported : %s' % self.default['QGIS_STYLE'] + self.error(msg) - if f == 'qgis_style.sql': - qgis_style = join(default['SETTINGS'], f) + # Check folders. + folders = ['IMPORT_QUEUE', 'IMPORT_DONE', 'SETTINGS', 'CACHE'] + for folder in folders: + if not isabs(self.default[folder]): + # Get the absolute path. + self.default[folder] = abspath(self.default[folder]) -if not osm_file: - print >> stderr, 'OSM file *.pbf is missing in %s' % default['SETTINGS'] - exit() + # Test the folder + if not exists(self.default[folder]): + msg = 'The folder %s does not exist.' % self.default[folder] + self.error(msg) -if not mapping_file: - print >> stderr, 'Mapping file *.json is missing in %s' % default['SETTINGS'] - exit() + # Test files + for f in listdir(self.default['SETTINGS']): -if not post_import_file: - print 'No *.sql detected in %s' % default['SETTINGS'] -else: - print '%s detected for post import.' % post_import_file + if f.endswith('.pbf'): + self.osm_file = join(self.default['SETTINGS'], f) -if not qgis_style and default['QGIS_STYLE'] == 'yes': - print >> stderr, 'qgis_style.sql is missing in %s and QGIS_STYLE = yes.' % default['SETTINGS'] - exit() -elif qgis_style and default['QGIS_STYLE']: - print '%s detected for QGIS styling.' % qgis_style -else: - print 'Not using QGIS default styles.' + if f.endswith('.json'): + self.mapping_file = join(self.default['SETTINGS'], f) -# Create the timestamp file -file_path = join(default['SETTINGS'], 'timestamp.txt') -timestamp_file = open(file_path, 'w') -timestamp_file.write('UNDEFINED\n') -timestamp_file.close() + if f == 'post-pbf-import.sql': + self.post_import_file = join(self.default['SETTINGS'], f) -# In docker-compose, we should wait for the DB is ready. -print 'The checkup is OK. The container will continue soon, after the database.' -sleep(45) + if f == 'qgis_style.sql': + self.qgis_style = join(self.default['SETTINGS'], f) -# Check postgis. -try: - connection = connect( - "dbname='%s' user='%s' host='%s' password='%s'" % ( - default['DATABASE'], - default['USER'], - default['HOST'], - default['PASSWORD'])) - cursor = connection.cursor() -except OperationalError as e: - print >> stderr, e - exit() + if not self.osm_file: + msg = 'OSM file *.pbf is missing in %s' % self.default['SETTINGS'] + self.error(msg) -postgis_uri = 'postgis://%s:%s@%s/%s' % ( - default['USER'], - default['PASSWORD'], - default['HOST'], - default['DATABASE']) + if not self.mapping_file: + msg = 'Mapping file *.json is missing in %s' % self.default['SETTINGS'] + self.error(msg) + if not self.post_import_file: + self.info('No *.sql detected in %s' % self.default['SETTINGS']) + else: + self.info('%s detected for post import.' % self.post_import_file) + if not self.qgis_style and self.default['QGIS_STYLE'] == 'yes': + msg = 'qgis_style.sql is missing in %s and QGIS_STYLE = yes.' % self.default['SETTINGS'] + self.error(msg) + elif self.qgis_style and self.default['QGIS_STYLE']: + self.info('%s detected for QGIS styling.' % self.qgis_style) + else: + self.info('Not using QGIS default styles.') -# Check if there is a table starting with 'osm_' -sql = 'select count(*) ' \ - 'from information_schema.tables ' \ - 'where table_name like \'osm_%\';' -# noinspection PyUnboundLocalVariable -cursor.execute(sql) -osm_tables = cursor.fetchone()[0] -if osm_tables < 1: - # It means that the DB is empty. Let's import the PBF file. - command = ['imposm3', 'import', '-diff', '-deployproduction'] - command += ['-overwritecache', '-cachedir', default['CACHE']] - command += ['-srid', default['SRID']] - command += ['-dbschema-production', default['DBSCHEMA_PRODUCTION']] - command += ['-dbschema-import', default['DBSCHEMA_IMPORT']] - command += ['-dbschema-backup', default['DBSCHEMA_BACKUP']] - command += ['-diffdir', default['SETTINGS']] - command += ['-mapping', mapping_file] - command += ['-read', osm_file] - command += ['-write', '-connection', postgis_uri] + # In docker-compose, we should wait for the DB is ready. + self.info('The checkup is OK. The container will continue soon, after the database.') + sleep(45) - print 'The database is empty. Let\'s import the PBF : %s' % osm_file - print ' '.join(command) - if not call(command) == 0: - print >> stderr, 'An error occured in imposm with the original file.' - exit() - else: - print 'Import PBF successful : %s' % osm_file + def create_timestamp(self): + file_path = join(self.default['SETTINGS'], 'timestamp.txt') + timestamp_file = open(file_path, 'w') + timestamp_file.write('UNDEFINED\n') + timestamp_file.close() - if post_import_file or qgis_style: - # Set the password for psql - environ['PGPASSWORD'] = default['PASSWORD'] + def update_timestamp(self, database_timestamp): + file_path = join(self.default['SETTINGS'], 'timestamp.txt') + timestamp_file = open(file_path, 'w') + timestamp_file.write('%s\n' % database_timestamp) + timestamp_file.close() - if post_import_file: - print 'Running the post import SQL file.' + def check_postgis(self): + try: + connection = connect( + "dbname='%s' user='%s' host='%s' password='%s'" % ( + self.default['DATABASE'], + self.default['USER'], + self.default['HOST'], + self.default['PASSWORD'])) + self.cursor = connection.cursor() + except OperationalError as e: + print >> stderr, e + exit() + + self.postgis_uri = 'postgis://%s:%s@%s/%s' % ( + self.default['USER'], + self.default['PASSWORD'], + self.default['HOST'], + self.default['DATABASE']) + + def import_custom_sql(self): + self.info('Running the post import SQL file.') command = ['psql'] - command += ['-h', default['HOST']] - command += ['-U', default['USER']] - command += ['-d', default['DATABASE']] - command += ['-f', post_import_file] + command += ['-h', self.default['HOST']] + command += ['-U', self.default['USER']] + command += ['-d', self.default['DATABASE']] + command += ['-f', self.post_import_file] call(command) - if qgis_style: - 'Installing QGIS styles.' + def import_qgis_styles(self): + self.info('Installing QGIS styles.') command = ['psql'] - command += ['-h', default['HOST']] - command += ['-U', default['USER']] - command += ['-d', default['DATABASE']] - command += ['-f', qgis_style] + command += ['-h', self.default['HOST']] + command += ['-U', self.default['USER']] + command += ['-d', self.default['DATABASE']] + command += ['-f', self.qgis_style] call(command) -else: - print 'The database is not empty. Let\'s import only diff files.' -# Finally launch the listening process. -while True: - import_queue = sorted(listdir(default['IMPORT_QUEUE'])) - if len(import_queue) > 0: - for diff in import_queue: - print 'Importing diff %s' % diff - command = ['imposm3', 'diff'] - command += ['-cachedir', default['CACHE']] - command += ['-dbschema-production', default['DBSCHEMA_PRODUCTION']] - command += ['-dbschema-import', default['DBSCHEMA_IMPORT']] - command += ['-dbschema-backup', default['DBSCHEMA_BACKUP']] - command += ['-srid', default['SRID']] - command += ['-diffdir', default['SETTINGS']] - command += ['-mapping', mapping_file] - command += ['-connection', postgis_uri] - command += [join(default['IMPORT_QUEUE'], diff)] + def count_table(self, name): + """Check if there is a table starting with name.""" + sql = 'select count(*) ' \ + 'from information_schema.tables ' \ + 'where table_name like \'%s\';' % name + # noinspection PyUnboundLocalVariable + self.cursor.execute(sql) + return self.cursor.fetchone()[0] - print ' '.join(command) - if call(command) == 0: - move( - join(default['IMPORT_QUEUE'], diff), - join(default['IMPORT_DONE'], diff)) + def run(self): + osm_tables = self.count_table('osm_%') + if osm_tables < 1: + # It means that the DB is empty. Let's import the PBF file. + self._first_pbf_import() + else: + self.info('The database is not empty. Let\'s import only diff files.') - # Update the timestamp in the file. - database_timestamp = diff.split('.')[0].split('->-')[1] - file_path = join(default['SETTINGS'], 'timestamp.txt') - timestamp_file = open(file_path, 'w') - timestamp_file.write('%s\n' % database_timestamp) - timestamp_file.close() + self._import_diff() - print 'Import diff successful : %s' % diff - else: - print >> stderr, 'An error occured in imposm with a diff.' - exit() + def _first_pbf_import(self): + command = ['imposm3', 'import', '-diff', '-deployproduction'] + command += ['-overwritecache', '-cachedir', self.default['CACHE']] + command += ['-srid', self.default['SRID']] + command += ['-dbschema-production', + self.default['DBSCHEMA_PRODUCTION']] + command += ['-dbschema-import', self.default['DBSCHEMA_IMPORT']] + command += ['-dbschema-backup', self.default['DBSCHEMA_BACKUP']] + command += ['-diffdir', self.default['SETTINGS']] + command += ['-mapping', self.mapping_file] + command += ['-read', self.osm_file] + command += ['-write', '-connection', self.postgis_uri] + self.info('The database is empty. Let\'s import the PBF : %s' % self.osm_file) + self.info(' '.join(command)) + if not call(command) == 0: + msg = 'An error occured in imposm with the original file.' + self.error(msg) + else: + self.info('Import PBF successful : %s' % self.osm_file) - if len(listdir(default['IMPORT_QUEUE'])) == 0: - print 'Sleeping for %s seconds.' % default['TIME'] - sleep(float(default['TIME'])) + if self.post_import_file or self.qgis_style: + # Set the password for psql + environ['PGPASSWORD'] = self.default['PASSWORD'] + + if self.post_import_file: + self.import_custom_sql() + + if self.qgis_style: + self.import_qgis_styles() + + def _import_diff(self): + # Finally launch the listening process. + while True: + import_queue = sorted(listdir(self.default['IMPORT_QUEUE'])) + if len(import_queue) > 0: + for diff in import_queue: + self.info('Importing diff %s' % diff) + command = ['imposm3', 'diff'] + command += ['-cachedir', self.default['CACHE']] + command += ['-dbschema-production', self.default['DBSCHEMA_PRODUCTION']] + command += ['-dbschema-import', self.default['DBSCHEMA_IMPORT']] + command += ['-dbschema-backup', self.default['DBSCHEMA_BACKUP']] + command += ['-srid', self.default['SRID']] + command += ['-diffdir', self.default['SETTINGS']] + command += ['-mapping', self.mapping_file] + command += ['-connection', self.postgis_uri] + command += [join(self.default['IMPORT_QUEUE'], diff)] + + self.info(' '.join(command)) + if call(command) == 0: + move( + join(self.default['IMPORT_QUEUE'], diff), + join(self.default['IMPORT_DONE'], diff)) + + # Update the timestamp in the file. + database_timestamp = diff.split('.')[0].split('->-')[1] + self.update_timestamp(database_timestamp) + + self.info('Import diff successful : %s' % diff) + else: + msg = 'An error occured in imposm with a diff.' + self.error(msg) + + if len(listdir(self.default['IMPORT_QUEUE'])) == 0: + self.info('Sleeping for %s seconds.' % self.default['TIME']) + sleep(float(self.default['TIME'])) + +if __name__ == '__main__': + importer = Importer() + importer.overwrite_environment() + importer.check_settings() + importer.create_timestamp() + importer.check_postgis() + importer.run()