#!/usr/bin/env python # -*- coding: UTF-8 -*- """ /*************************************************************************** Docker-OSM Enrich An enrich database of docker osm. ------------------- begin : 2019-03-13 email : irwan at kartoza dot com contributor : Irwan Fathurrahman ***************************************************************************/ /*************************************************************************** * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation; either version 2 of the License, or * * (at your option) any later version. * * * ***************************************************************************/ """ import gzip from os import environ, listdir, mkdir from os.path import join, exists, getsize from sys import exit, stderr from time import sleep from urllib import request import xmltodict import yaml from dateutil import parser from psycopg2 import connect, OperationalError, ProgrammingError from xmltodict import OrderedDict class Enrich(object): mapping_type = { 'point': 'node', 'linestring': 'way', 'polygon': 'way' } enriched_column = { 'changeset_id': 'int', 'changeset_version': 'int', 'changeset_timestamp': 'datetime', 'changeset_user': 'string' } latest_diff_file = None cache_folder = None out_of_scope_osm_folder = None def __init__(self): # Default values which can be overwritten by environment variable. self.default = { 'TIME': 120, 'POSTGRES_USER': 'docker', 'POSTGRES_PASS': 'docker', 'POSTGRES_DBNAME': 'gis', 'POSTGRES_HOST': 'db', 'POSTGRES_PORT': '5432', 'SETTINGS': 'settings', 'OSM_API_URL': 'https://api.openstreetmap.org/api/0.6/', 'IMPORT_DONE': 'import_done', 'CACHE': 'cache', 'MAX_DIFF_FILE_SIZE': 100000000, 'DBSCHEMA_PRODUCTION': 'public', 'CACHE_MODIFY_CHECK': '' } self.mapping_file = None self.mapping_database_schema = {} self.postgis_uri = None self.overwrite_environment() self.check_settings() def check_settings(self): """Perform various checking. This will run when the container is starting. If an error occurs, the container will stop. """ # Test files try: for f in listdir(self.default['SETTINGS']): if f.endswith('.yml'): self.mapping_file = join(self.default['SETTINGS'], f) except FileNotFoundError: pass if not self.mapping_file: self.error( 'Mapping file *.yml is missing in %s' % self.default['SETTINGS'] ) else: self.info('Mapping: ' + self.mapping_file) # In docker-compose, we should wait for the DB is ready. self.check_mapping_file_data() self.info('The checkup is OK.') # enrich cache_folder = self.default['CACHE'] if exists(cache_folder): cache_folder = join(cache_folder, 'enrich') if not exists(cache_folder): mkdir(cache_folder) # out_of_scope_osm out_of_scope_osm_folder = join( cache_folder, 'out_of_scope_osm') if not exists(out_of_scope_osm_folder): mkdir(out_of_scope_osm_folder) self.out_of_scope_osm_folder = out_of_scope_osm_folder self.cache_folder = cache_folder # check using not found cache for modify if self.default['CACHE_MODIFY_CHECK'].lower() == 'true': self.default['CACHE_MODIFY_CHECK'] = True else: self.default['CACHE_MODIFY_CHECK'] = False def get_cache_path(self): return join(self.cache_folder, 'cache') def get_cache_file(self): """ Return path of cache file return None if not found """ if self.cache_folder: if exists(self.cache_folder): cache_file = self.get_cache_path() if exists(cache_file): return cache_file return None def is_non_recognized_id(self, osm_type, osm_id): """ Return if osm id and type is unrecognized id """ if not self.default['CACHE_MODIFY_CHECK']: return False if self.out_of_scope_osm_folder: if exists( join(self.out_of_scope_osm_folder, '%s-%s' % (osm_type, osm_id))): return True return False def get_or_create_non_recognized_id(self, osm_type, osm_id): """ Create file as cache for non recognized id """ if not self.default['CACHE_MODIFY_CHECK']: return if self.out_of_scope_osm_folder: filename = join( self.out_of_scope_osm_folder, '%s-%s' % (osm_type, osm_id)) if not exists(filename): try: f = open(filename, 'w+') f.close() except IOError: self.info('%s can\'t be created' % filename) def check_mapping_file_data(self): """Perform converting yaml data into json that used for checking table on database """ self.info('Load Mapping file data.') document = open(self.mapping_file, 'r') mapping_data = yaml.load(document) try: for table, value in mapping_data['tables'].items(): try: type = value['type'] try: osm_type = self.mapping_type[type] osm_id_column = None osm_id_column_index = None columns = ['id'] for index, column in enumerate(value['columns']): columns.append(column['name']) try: if column['type'] == 'id': osm_id_column = column['name'] osm_id_column_index = index except KeyError: pass columns.extend(self.enriched_column.keys()) self.mapping_database_schema['osm_%s' % table] = { 'osm_type': osm_type, 'osm_id_columnn': osm_id_column, 'osm_id_columnn_index': osm_id_column_index, 'columns': columns } except KeyError: self.info('Type %s is not yet recognized by enrich.' % type) except KeyError: self.info( 'Table %s doesn\'t has "type" attribute' ) except KeyError: self.error( 'Mapping file %s doesn\'t has "tables" attribute' % self.mapping_file ) def create_connection(self): return connect( "dbname='%s' user='%s' host='%s' password='%s'" % ( self.default['POSTGRES_DBNAME'], self.default['POSTGRES_USER'], self.default['POSTGRES_HOST'], self.default['POSTGRES_PASS'])) def check_database(self): """Test connection to PostGIS and create the URI.""" try: connection = self.create_connection() cursor = connection.cursor() except OperationalError as e: print(e) try: for table, table_data in self.mapping_database_schema.items(): new_columns_postgis = [] for enrich_key, enrich_type in self.enriched_column.items(): check_column = ''' SELECT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name='%s' and column_name='%s'); ''' % ( table, enrich_key) cursor.execute(check_column) column_existence = cursor.fetchone()[0] if column_existence != 1: if enrich_type == 'int': new_columns_postgis.append('ADD COLUMN IF NOT EXISTS %s NUMERIC' % enrich_key) elif enrich_type == 'string': new_columns_postgis.append( 'ADD COLUMN IF NOT EXISTS %s CHARACTER VARYING (255)' % enrich_key) elif enrich_type == 'datetime': new_columns_postgis.append('ADD COLUMN IF NOT EXISTS %s TIMESTAMPTZ' % enrich_key) if len(new_columns_postgis) > 0: query = 'ALTER TABLE %s."%s" %s;' % ( self.default['DBSCHEMA_PRODUCTION'], table, ','.join(new_columns_postgis)) cursor.execute(query) connection.commit() connection.close() return True except (OperationalError, ProgrammingError): connection.rollback() connection.close() return False @staticmethod def info(message): print(message) @staticmethod def error(message): print(stderr.write(message)) exit() def overwrite_environment(self): """Overwrite default values from the environment.""" for key in list(environ.keys()): if key in list(self.default.keys()): self.default[key] = environ[key] def check_data_on_dict(self, data, key): """ return data from dict with key return None if not found """ try: return data[key] except KeyError: return None def get_osm_enrich_new_data(self, from_osm, from_database): """ Convert data from xml of osm into json and check if from osm is newer than from database :param from_osm: Data that got from osm :type from_osm: dict :param from_database: Data that in local database :type from_database: dict :return: Dictionary of new data that need to be inserted :rtype: dict """ osm_id = self.check_data_on_dict(from_osm, '@id') row = from_database new_data = [] if osm_id and row: allow_updated = False osm_timestamp = self.check_data_on_dict(from_osm, '@timestamp') osm_datetime = parser.parse(osm_timestamp).replace(tzinfo=None) if not row['changeset_timestamp'] or row['changeset_timestamp'] < osm_datetime: allow_updated = True if allow_updated: osm_version = self.check_data_on_dict(from_osm, '@version') osm_changeset = self.check_data_on_dict(from_osm, '@changeset') osm_user = self.check_data_on_dict(from_osm, '@user') self.info('Update for %s' % osm_id) new_data = { 'changeset_id': osm_changeset, 'changeset_timestamp': osm_datetime, 'changeset_version': osm_version, 'changeset_user': osm_user } return new_data def update_enrich_into_database(self, table_name, osm_id_column, osm_id, new_data): """ Update new data into data of osm id :param table_name: Table source of rows :type table_name: str :param osm_id_column: Column name of osm_id :type osm_id_column: str :param osm_id: osm id of data :type osm_id: str :param new_data: new data that will be updated :type new_data: Dict """ if not new_data: return sets = [] for field, value in new_data.items(): try: value = value.replace('\'', '\'\'') except (TypeError, AttributeError): pass sets.append('%s=\'%s\'' % (field, value)) connection = self.create_connection() cursor = connection.cursor() try: query = 'UPDATE %s.%s SET %s WHERE %s=%s' % (self.default['DBSCHEMA_PRODUCTION'], table_name, ','.join(sets), osm_id_column, osm_id) cursor.execute(query) connection.commit() except ProgrammingError as e: connection.rollback() self.info('%s' % e) connection.close() # THIS PROCESS BELOW IS FOR EMPTY CHANGESET def update_osm_enrich_from_api_in_batch( self, osm_ids, osm_type, row_batch, table_name, osm_id_column): """ Get osm data from OSM API in Batch :param osm_ids: osm id in list :type osm_ids: list :param osm_type: feature type of this osm :type osm_type: str :param osm_type: feature type of this osm :type osm_type: str :param row_batch: Row data from local database in dictionary :type row_batch: Dict :param table_name: Table source of rows :type table_name: str :param osm_id_column: Column name of osm_id :type osm_id_column: str """ if len(osm_ids) == 0: return osm_type_on_url = osm_type + 's' url = join(self.default['OSM_API_URL'], osm_type_on_url) url += '?%s=%s' % (osm_type_on_url, ','.join(osm_ids)) self.info(url) content = None try: raw_content = request.urlopen(url).read() raw_content = xmltodict.parse(raw_content) if type(raw_content['osm'][osm_type]) == list: for osm in raw_content['osm'][osm_type]: osm_id = self.check_data_on_dict(osm, '@id') row = self.check_data_on_dict(row_batch, osm_id) new_data = self.get_osm_enrich_new_data( osm, row) self.update_enrich_into_database( table_name, osm_id_column, osm_id, new_data) else: osm_id = self.check_data_on_dict( raw_content['osm'][osm_type], '@id') row = self.check_data_on_dict(row_batch, osm_id) new_data = self.get_osm_enrich_new_data( raw_content['osm'][osm_type], row) self.update_enrich_into_database( table_name, osm_id_column, osm_id, new_data) except Exception as e: self.info('%s' % e) return content def process_empty_changeset_from_table(self, table_name, table_columns, osm_id_column, osm_type): """ Processing all data from table :param table_name: Table source :type table_name: str :param table_columns: columns of tables :type table_columns: list :param osm_type: feature type of this osm :type osm_type: str :param osm_type: feature type of this osm :type osm_type: str :param osm_id_column: Column name of osm_id :type osm_id_column: str """ # noinspection PyUnboundLocalVariable connection = self.create_connection() cursor = connection.cursor() row_batch = {} osm_ids = [] try: check_sql = ''' select * from %s."%s" WHERE "changeset_timestamp" IS NULL AND "osm_id" IS NOT NULL ORDER BY "osm_id" ''' % (self.default['DBSCHEMA_PRODUCTION'], table_name) cursor.execute(check_sql) row = True while row: # do something with row row = cursor.fetchone() if row: row = dict(zip(table_columns, row)) row_batch['%s' % row[osm_id_column]] = row osm_ids.append('%s' % row[osm_id_column]) if len(osm_ids) == 30: self.update_osm_enrich_from_api_in_batch( osm_ids, osm_type, row_batch, table_name, osm_id_column) row_batch = {} osm_ids = [] self.update_osm_enrich_from_api_in_batch( osm_ids, osm_type, row_batch, table_name, osm_id_column) except ProgrammingError as e: connection.rollback() self.info('%s' % e) def enrich_empty_changeset(self): """Enrich database that has empty changeset by using OSM API URL """ self.info('Enrich Database with empty changeset') for table, table_data in self.mapping_database_schema.items(): osm_id_columnn = table_data['osm_id_columnn'] osm_type = table_data['osm_type'] columns = table_data['columns'] if osm_id_columnn is not None: self.info('Checking data from table %s' % table) self.process_empty_changeset_from_table( table, columns, osm_id_columnn, osm_type) else: self.info('Does not know osm_id column for %s.' % table) # THIS PROCESS BELOW IS FOR CHECKING DIFF FILES def enrich_database_from_osm_data(self, osm_data, osm_data_type): """ Convert data from xml of osm into json and check if from osm is newer than from database :param from_osm: Data that got from osm :type from_osm: dict :param from_database: Data that in local database :type from_database: dict :return: Dictionary of new data that need to be inserted :rtype: dict """ osm_id = self.check_data_on_dict( osm_data, '@id') for table, table_data in self.mapping_database_schema.items(): if osm_data_type == table_data['osm_type']: # check if this osm is not found on database if self.is_non_recognized_id(osm_data_type, osm_id): continue connection = self.create_connection() cursor = connection.cursor() try: validate_sql = ''' select * from %s."%s" WHERE "%s"=%s ''' % (self.default['DBSCHEMA_PRODUCTION'], table, table_data['osm_id_columnn'], osm_id) cursor.execute(validate_sql) row = cursor.fetchone() if row: row = dict(zip(table_data['columns'], row)) new_data = self.get_osm_enrich_new_data(osm_data, row) self.update_enrich_into_database( table, table_data['osm_id_columnn'], osm_id, new_data) else: # if this id is not found add in cache self.get_or_create_non_recognized_id(osm_data_type, osm_id) except Exception as e: self.info('error when processing %s: %s' % (osm_id, e)) connection.close() def enrich_database_from_diff_file(self): # check latest diff file if self.get_cache_file(): self.latest_diff_file = open(self.get_cache_file(), "r").read() # get list diff file next_latest_diff_file = None target_folder = self.default['IMPORT_DONE'] self.info('Enrich Database with diff file in %s' % self.default['IMPORT_DONE']) if not exists(target_folder): self.info('Folder %s is not ready yet' % target_folder) return for filename in sorted(listdir(target_folder)): try: if filename.endswith('.gz'): if not self.latest_diff_file or self.latest_diff_file < filename: self.info('Processing %s' % filename) # if it is newest file # process for getting this gzip_file = join(target_folder, filename) if getsize(gzip_file) > self.default['MAX_DIFF_FILE_SIZE']: self.info('File is too big, skip it') continue f = gzip.open(gzip_file, 'rb') file_content = f.read() f.close() raw_content = xmltodict.parse(file_content) try: modify_list = raw_content['osmChange']['modify'] for list in modify_list: for key, value in list.items(): if type(value) != OrderedDict: for osm_data in value: self.enrich_database_from_osm_data( osm_data, key ) else: self.enrich_database_from_osm_data( value, key ) except KeyError: self.info('%s can not be opened' % filename) if not next_latest_diff_file or next_latest_diff_file < filename: next_latest_diff_file = filename except Exception as e: self.info('Error when processing %s : %s' % (filename, e)) if next_latest_diff_file: try: cache_file = self.get_cache_path() f = open(cache_file, 'w') f.write(next_latest_diff_file) f.close() except IOError: self.info('cache file can\'t be created') def locate_table(self, name, schema): """Check for tables in the DB table exists in the DB""" connection = self.create_connection() cursor = connection.cursor() sql = """ SELECT EXISTS (SELECT 1 AS result from information_schema.tables where table_name like TEMP_TABLE and table_schema = 'TEMP_SCHEMA'); """ cursor.execute(sql.replace('TEMP_TABLE', '%s' % name).replace('TEMP_SCHEMA', '%s' % schema)) # noinspection PyUnboundLocalVariable return cursor.fetchone()[0] def run(self): """First checker.""" while True: self.info('Run enrich process') osm_tables = self.locate_table("'osm_%'", self.default['DBSCHEMA_PRODUCTION']) if osm_tables != 1: self.info('Imposm is still running, wait a while and try again') else: if self.check_database(): self.enrich_empty_changeset() self.enrich_database_from_diff_file() # sleep looping self.info('sleeping for %s' % self.default['TIME']) sleep(float(self.default['TIME'])) if __name__ == '__main__': enrich = Enrich() enrich.run()