diff --git a/docker-compose.yml b/docker-compose.yml index fae04c6..dd6060f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -26,7 +26,7 @@ services: # ports: # - "35432:5432" healthcheck: - test: "exit 0" + test: "exit 0" osm_downloader: image: kartoza/docker-osm:pbf-downloader @@ -54,8 +54,8 @@ services: - import_queue:/home/import_queue - cache:/home/cache depends_on: - db: - condition: service_healthy + db: + condition: service_healthy environment: - POSTGRES_USER=docker - POSTGRES_PASS=docker @@ -138,3 +138,26 @@ services: depends_on: db: condition: service_healthy + + osmenrich: + build: docker-osmenrich + container_name: dockerosm_osmenrich + volumes: + # These are sharable to other containers + - ./settings:/home/settings + - import_done:/home/import_done + - import_queue:/home/import_queue + - cache:/home/cache + depends_on: + db: + condition: service_healthy + environment: + # These are all currently the defaults but listed here for your + # convenience if you want to change them + # folder for diff which hasn't been imported yet + - IMPORT_QUEUE=import_queue + # folder for diff which has been imported + - IMPORT_DONE=import_done + # seconds between 2 executions of the script + # if 0, then no update will be done, only the first initial import from the PBF + - TIME=120 diff --git a/docker-osmenrich/Dockerfile b/docker-osmenrich/Dockerfile new file mode 100644 index 0000000..43dd6a1 --- /dev/null +++ b/docker-osmenrich/Dockerfile @@ -0,0 +1,11 @@ +FROM python:3 +MAINTAINER Irwan Fathurrahman + +ADD requirements.txt /home/requirements.txt +RUN pip3 install -r /home/requirements.txt + +ADD enrich.py /home/ + +WORKDIR /home +CMD ["python3", "-u", "/home/enrich.py"] + diff --git a/docker-osmenrich/enrich.py b/docker-osmenrich/enrich.py new file mode 100644 index 0000000..5b30754 --- /dev/null +++ b/docker-osmenrich/enrich.py @@ -0,0 +1,570 @@ +#!/usr/bin/env python +# -*- coding: UTF-8 -*- +""" +/*************************************************************************** + Docker-OSM Enrich + An enrich database of docker osm. + ------------------- + begin : 2019-03-13 + email : irwan at kartoza dot com + contributor : Irwan Fathurrahman + ***************************************************************************/ +/*************************************************************************** + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + ***************************************************************************/ +""" + +import gzip +import xmltodict +import yaml +from xmltodict import OrderedDict +from dateutil import parser +from os import environ, listdir, mkdir +from os.path import join, exists, getsize +from sys import exit, stderr +from urllib import request +from psycopg2 import connect, OperationalError, ProgrammingError +from time import sleep + + +class Enrich(object): + mapping_type = { + 'point': 'node', + 'linestring': 'way', + 'polygon': 'way' + } + enriched_column = { + 'changeset_id': 'int', + 'changeset_version': 'int', + 'changeset_timestamp': 'datetime', + 'changeset_user': 'string' + } + latest_diff_file = None + cache_folder = None + out_of_scope_osm_folder = None + + def __init__(self): + # Default values which can be overwritten by environment variable. + self.default = { + 'TIME': 120, + 'POSTGRES_USER': 'docker', + 'POSTGRES_PASS': 'docker', + 'POSTGRES_DBNAME': 'gis', + 'POSTGRES_HOST': 'db', + 'POSTGRES_PORT': '5432', + 'SETTINGS': 'settings', + 'OSM_API_URL': 'https://api.openstreetmap.org/api/0.6/', + 'IMPORT_DONE': 'import_done', + 'CACHE': 'cache', + 'MAX_DIFF_FILE_SIZE': 100000000, + 'CACHE_MODIFY_CHECK': '' + } + self.mapping_file = None + self.mapping_database_schema = {} + self.postgis_uri = None + self.overwrite_environment() + self.check_settings() + + def check_settings(self): + """Perform various checking. + + This will run when the container is starting. If an error occurs, the + container will stop. + """ + # Test files + try: + for f in listdir(self.default['SETTINGS']): + if f.endswith('.yml'): + self.mapping_file = join(self.default['SETTINGS'], f) + except FileNotFoundError: + pass + + if not self.mapping_file: + self.error( + 'Mapping file *.yml is missing in %s' % self.default['SETTINGS'] + ) + else: + self.info('Mapping: ' + self.mapping_file) + + # In docker-compose, we should wait for the DB is ready. + self.check_mapping_file_data() + self.info('The checkup is OK.') + + # enrich + cache_folder = self.default['CACHE'] + if exists(cache_folder): + cache_folder = join(cache_folder, 'enrich') + if not exists(cache_folder): + mkdir(cache_folder) + + # out_of_scope_osm + out_of_scope_osm_folder = join( + cache_folder, 'out_of_scope_osm') + if not exists(out_of_scope_osm_folder): + mkdir(out_of_scope_osm_folder) + self.out_of_scope_osm_folder = out_of_scope_osm_folder + + self.cache_folder = cache_folder + + # check using not found cache for modify + if self.default['CACHE_MODIFY_CHECK'].lower() == 'true': + self.default['CACHE_MODIFY_CHECK'] = True + else: + self.default['CACHE_MODIFY_CHECK'] = False + + def get_cache_path(self): + return join(self.cache_folder, 'cache') + + def get_cache_file(self): + """ Return path of cache file + return None if not found + """ + if self.cache_folder: + if exists(self.cache_folder): + cache_file = self.get_cache_path() + if exists(cache_file): + return cache_file + return None + + def is_non_recognized_id(self, osm_type, osm_id): + """ Return if osm id and type is unrecognized id + """ + if not self.default['CACHE_MODIFY_CHECK']: + return False + + if self.out_of_scope_osm_folder: + if exists( + join(self.out_of_scope_osm_folder, + '%s-%s' % (osm_type, osm_id))): + return True + return False + + def get_or_create_non_recognized_id(self, osm_type, osm_id): + """ Create file as cache for non recognized id + """ + if not self.default['CACHE_MODIFY_CHECK']: + return + + if self.out_of_scope_osm_folder: + filename = join( + self.out_of_scope_osm_folder, + '%s-%s' % (osm_type, osm_id)) + if not exists(filename): + try: + f = open(filename, 'w+') + f.close() + except IOError: + self.info('%s can\'t be created' % filename) + + def check_mapping_file_data(self): + """Perform converting yaml data into json + that used for checking table on database + """ + self.info('Load Mapping file data.') + document = open(self.mapping_file, 'r') + mapping_data = yaml.load(document) + try: + for table, value in mapping_data['tables'].items(): + try: + type = value['type'] + try: + osm_type = self.mapping_type[type] + osm_id_column = None + osm_id_column_index = None + columns = ['id'] + for index, column in enumerate(value['columns']): + columns.append(column['name']) + try: + if column['type'] == 'id': + osm_id_column = column['name'] + osm_id_column_index = index + except KeyError: + pass + columns.extend(self.enriched_column.keys()) + self.mapping_database_schema['osm_%s' % table] = { + 'osm_type': osm_type, + 'osm_id_columnn': osm_id_column, + 'osm_id_columnn_index': osm_id_column_index, + 'columns': columns + } + except KeyError: + self.info('Type %s is not yet recognized by enrich.' % type) + except KeyError: + self.info( + 'Table %s doesn\'t has "type" attribute' + ) + + except KeyError: + self.error( + 'Mapping file %s doesn\'t has "tables" attribute' % self.mapping_file + ) + + def create_connection(self): + return connect( + "dbname='%s' user='%s' host='%s' password='%s'" % ( + self.default['POSTGRES_DBNAME'], + self.default['POSTGRES_USER'], + self.default['POSTGRES_HOST'], + self.default['POSTGRES_PASS'])) + + def check_database(self): + """Test connection to PostGIS and create the URI.""" + connection = self.create_connection() + cursor = connection.cursor() + try: + for table, table_data in self.mapping_database_schema.items(): + new_columns_postgis = [] + for enrich_key, enrich_type in self.enriched_column.items(): + try: + cursor.execute('select %s from %s' % (enrich_key, table)) + except ProgrammingError as e: + connection.rollback() + if enrich_type == 'int': + new_columns_postgis.append('ADD COLUMN %s INTEGER' % enrich_key) + elif enrich_type == 'string': + new_columns_postgis.append('ADD COLUMN %s VARCHAR' % enrich_key) + elif enrich_type == 'datetime': + new_columns_postgis.append('ADD COLUMN %s timestamp' % enrich_key) + if len(new_columns_postgis) > 0: + query = 'ALTER TABLE %s %s' % (table, ','.join(new_columns_postgis)) + cursor.execute(query) + connection.commit() + connection.close() + return True + except (OperationalError, ProgrammingError): + connection.rollback() + connection.close() + return False + + @staticmethod + def info(message): + print(message) + + @staticmethod + def error(message): + print(stderr.write(message)) + exit() + + def overwrite_environment(self): + """Overwrite default values from the environment.""" + for key in list(environ.keys()): + if key in list(self.default.keys()): + self.default[key] = environ[key] + + def check_data_on_dict(self, data, key): + """ return data from dict with key + return None if not found + """ + try: + return data[key] + except KeyError: + return None + + def get_osm_enrich_new_data(self, from_osm, from_database): + """ Convert data from xml of osm into json + and check if from osm is newer than from database + + :param from_osm: Data that got from osm + :type from_osm: dict + + :param from_database: Data that in local database + :type from_database: dict + + :return: Dictionary of new data that need to be inserted + :rtype: dict + """ + osm_id = self.check_data_on_dict(from_osm, '@id') + row = from_database + new_data = [] + if osm_id and row: + allow_updated = False + osm_timestamp = self.check_data_on_dict(from_osm, '@timestamp') + osm_datetime = parser.parse(osm_timestamp).replace(tzinfo=None) + if not row['changeset_timestamp'] or row['changeset_timestamp'] < osm_datetime: + allow_updated = True + if allow_updated: + osm_version = self.check_data_on_dict(from_osm, '@version') + osm_changeset = self.check_data_on_dict(from_osm, '@changeset') + osm_user = self.check_data_on_dict(from_osm, '@user') + self.info('Update for %s' % osm_id) + new_data = { + 'changeset_id': osm_changeset, + 'changeset_timestamp': osm_datetime, + 'changeset_version': osm_version, + 'changeset_user': osm_user + } + return new_data + + def update_enrich_into_database(self, table_name, osm_id_column, osm_id, new_data): + """ Update new data into data of osm id + + :param table_name: Table source of rows + :type table_name: str + + :param osm_id_column: Column name of osm_id + :type osm_id_column: str + + :param osm_id: osm id of data + :type osm_id: str + + :param new_data: new data that will be updated + :type new_data: Dict + """ + if not new_data: + return + sets = [] + for field, value in new_data.items(): + try: + value = value.replace('\'', '\'\'') + except (TypeError, AttributeError): + pass + sets.append('%s=\'%s\'' % (field, value)) + connection = self.create_connection() + cursor = connection.cursor() + try: + query = 'UPDATE %s SET %s WHERE %s=%s' % ( + table_name, ','.join(sets), osm_id_column, osm_id) + cursor.execute(query) + connection.commit() + except ProgrammingError as e: + connection.rollback() + self.info('%s' % e) + connection.close() + + # THIS PROCESS BELOW IS FOR EMPTY CHANGESET + def update_osm_enrich_from_api_in_batch( + self, osm_ids, osm_type, row_batch, table_name, osm_id_column): + """ Get osm data from OSM API in Batch + + :param osm_ids: osm id in list + :type osm_ids: list + + :param osm_type: feature type of this osm + :type osm_type: str + + :param osm_type: feature type of this osm + :type osm_type: str + + :param row_batch: Row data from local database in dictionary + :type row_batch: Dict + + :param table_name: Table source of rows + :type table_name: str + + :param osm_id_column: Column name of osm_id + :type osm_id_column: str + """ + if len(osm_ids) == 0: + return + osm_type_on_url = osm_type + 's' + url = join(self.default['OSM_API_URL'], osm_type_on_url) + url += '?%s=%s' % (osm_type_on_url, ','.join(osm_ids)) + self.info(url) + content = None + try: + raw_content = request.urlopen(url).read() + raw_content = xmltodict.parse(raw_content) + if type(raw_content['osm'][osm_type]) == list: + for osm in raw_content['osm'][osm_type]: + osm_id = self.check_data_on_dict(osm, '@id') + row = self.check_data_on_dict(row_batch, osm_id) + new_data = self.get_osm_enrich_new_data( + osm, row) + self.update_enrich_into_database( + table_name, osm_id_column, osm_id, new_data) + + else: + osm_id = self.check_data_on_dict( + raw_content['osm'][osm_type], '@id') + row = self.check_data_on_dict(row_batch, osm_id) + new_data = self.get_osm_enrich_new_data( + raw_content['osm'][osm_type], row) + self.update_enrich_into_database( + table_name, osm_id_column, osm_id, new_data) + + except Exception as e: + self.info('%s' % e) + return content + + def process_empty_changeset_from_table(self, table_name, table_columns, osm_id_column, osm_type): + """ Processing all data from table + + :param table_name: Table source + :type table_name: str + + :param table_columns: columns of tables + :type table_columns: list + + :param osm_type: feature type of this osm + :type osm_type: str + + :param osm_type: feature type of this osm + :type osm_type: str + + :param osm_id_column: Column name of osm_id + :type osm_id_column: str + """ + # noinspection PyUnboundLocalVariable + connection = self.create_connection() + cursor = connection.cursor() + row_batch = {} + osm_ids = [] + try: + cursor.execute( + 'select * from %s WHERE changeset_timestamp IS NULL AND osm_id IS NOT NULL ORDER BY osm_id' % table_name) + row = True + while row: + # do something with row + row = cursor.fetchone() + if row: + row = dict(zip(table_columns, row)) + row_batch['%s' % row[osm_id_column]] = row + osm_ids.append('%s' % row[osm_id_column]) + if len(osm_ids) == 30: + self.update_osm_enrich_from_api_in_batch( + osm_ids, osm_type, row_batch, table_name, osm_id_column) + row_batch = {} + osm_ids = [] + + self.update_osm_enrich_from_api_in_batch( + osm_ids, osm_type, row_batch, table_name, osm_id_column) + + except ProgrammingError as e: + connection.rollback() + self.info('%s' % e) + + def enrich_empty_changeset(self): + """Enrich database that has empty changeset by using OSM API URL + """ + self.info('Enrich Database with empty changeset') + for table, table_data in self.mapping_database_schema.items(): + osm_id_columnn = table_data['osm_id_columnn'] + osm_type = table_data['osm_type'] + columns = table_data['columns'] + if osm_id_columnn is not None: + self.info('Checking data from table %s' % table) + self.process_empty_changeset_from_table( + table, columns, osm_id_columnn, osm_type) + else: + self.info('Does not know osm_id column for %s.' % table) + + # THIS PROCESS BELOW IS FOR CHECKING DIFF FILES + def enrich_database_from_osm_data(self, osm_data, osm_data_type): + """ Convert data from xml of osm into json + and check if from osm is newer than from database + + :param from_osm: Data that got from osm + :type from_osm: dict + + :param from_database: Data that in local database + :type from_database: dict + + :return: Dictionary of new data that need to be inserted + :rtype: dict + """ + osm_id = self.check_data_on_dict( + osm_data, '@id') + for table, table_data in self.mapping_database_schema.items(): + if osm_data_type == table_data['osm_type']: + + # check if this osm is not found on database + if self.is_non_recognized_id(osm_data_type, osm_id): + continue + + connection = self.create_connection() + cursor = connection.cursor() + try: + cursor.execute('select * from %s WHERE %s=%s' % ( + table, table_data['osm_id_columnn'], osm_id)) + row = cursor.fetchone() + if row: + row = dict(zip(table_data['columns'], row)) + new_data = self.get_osm_enrich_new_data(osm_data, row) + self.update_enrich_into_database( + table, table_data['osm_id_columnn'], osm_id, new_data) + else: + # if this id is not found add in cache + self.get_or_create_non_recognized_id(osm_data_type, osm_id) + except Exception as e: + self.info('error when processing %s: %s' % (osm_id, e)) + connection.close() + + def enrich_database_from_diff_file(self): + # check latest diff file + if self.get_cache_file(): + self.latest_diff_file = open(self.get_cache_file(), "r").read() + + # get list diff file + next_latest_diff_file = None + target_folder = self.default['IMPORT_DONE'] + self.info('Enrich Database with diff file in %s' % self.default['IMPORT_DONE']) + if not exists(target_folder): + self.info('Folder %s is not ready yet' % target_folder) + return + for filename in sorted(listdir(target_folder)): + try: + if filename.endswith('.gz'): + if not self.latest_diff_file or self.latest_diff_file < filename: + self.info('Processing %s' % filename) + # if it is newest file + # process for getting this + gzip_file = join(target_folder, filename) + if getsize(gzip_file) > self.default['MAX_DIFF_FILE_SIZE']: + self.info('File is too big, skip it') + continue + f = gzip.open(gzip_file, 'rb') + file_content = f.read() + f.close() + raw_content = xmltodict.parse(file_content) + try: + modify_list = raw_content['osmChange']['modify'] + for list in modify_list: + for key, value in list.items(): + if type(value) != OrderedDict: + for osm_data in value: + self.enrich_database_from_osm_data( + osm_data, key + ) + else: + self.enrich_database_from_osm_data( + value, key + ) + except KeyError: + self.info('%s can not be opened' % filename) + if not next_latest_diff_file or next_latest_diff_file < filename: + next_latest_diff_file = filename + except Exception as e: + self.info('Error when processing %s : %s' % (filename, e)) + + if next_latest_diff_file: + try: + cache_file = self.get_cache_path() + f = open(cache_file, 'w') + f.write(next_latest_diff_file) + f.close() + except IOError: + self.info('cache file can\'t be created') + + def run(self): + """First checker.""" + while True: + self.info('Run enrich process') + if self.check_database(): + self.enrich_empty_changeset() + self.enrich_database_from_diff_file() + else: + self.info('Database is not ready') + + # sleep looping + self.info('sleeping for %s' % self.default['TIME']) + sleep(float(self.default['TIME'])) + + +if __name__ == '__main__': + enrich = Enrich() + enrich.run() diff --git a/docker-osmenrich/readme.md b/docker-osmenrich/readme.md new file mode 100644 index 0000000..e4b6b16 --- /dev/null +++ b/docker-osmenrich/readme.md @@ -0,0 +1,12 @@ +# Docker-osmenrich +Docker osm-enrich is the extension for docker osm to get the changeset of the osm data. +It will get the data from osm API and also get the update data from files that generated from docker-osmupdate + +- data is new (changeset is null) : get from docker osm +- data is exist but need to check the recent changeset : get data from file generated from osmupdate, update into database + +osmenrich will create new fields which are: +- changeset_id +- changeset_timestamp +- changeset_version +- changeset_user \ No newline at end of file diff --git a/docker-osmenrich/requirements.txt b/docker-osmenrich/requirements.txt new file mode 100644 index 0000000..e5726d2 --- /dev/null +++ b/docker-osmenrich/requirements.txt @@ -0,0 +1,4 @@ +psycopg2-binary +python-dateutil +pyyaml +xmltodict \ No newline at end of file