add osmenrich to put changesets data into database (#80)

* add osmenrich to put changesets data into database

* fix enrich cache save timing and ignore big file by env setting

* fix osmenrich rechecked from old cache date
pull/95/head
Irwan Fathurrahman 2020-01-15 16:15:54 +07:00 zatwierdzone przez mazano
rodzic 2fdf603a70
commit e8635da356
5 zmienionych plików z 623 dodań i 3 usunięć

Wyświetl plik

@ -26,7 +26,7 @@ services:
# ports:
# - "35432:5432"
healthcheck:
test: "exit 0"
test: "exit 0"
osm_downloader:
image: kartoza/docker-osm:pbf-downloader
@ -54,8 +54,8 @@ services:
- import_queue:/home/import_queue
- cache:/home/cache
depends_on:
db:
condition: service_healthy
db:
condition: service_healthy
environment:
- POSTGRES_USER=docker
- POSTGRES_PASS=docker
@ -138,3 +138,26 @@ services:
depends_on:
db:
condition: service_healthy
osmenrich:
build: docker-osmenrich
container_name: dockerosm_osmenrich
volumes:
# These are sharable to other containers
- ./settings:/home/settings
- import_done:/home/import_done
- import_queue:/home/import_queue
- cache:/home/cache
depends_on:
db:
condition: service_healthy
environment:
# These are all currently the defaults but listed here for your
# convenience if you want to change them
# folder for diff which hasn't been imported yet
- IMPORT_QUEUE=import_queue
# folder for diff which has been imported
- IMPORT_DONE=import_done
# seconds between 2 executions of the script
# if 0, then no update will be done, only the first initial import from the PBF
- TIME=120

Wyświetl plik

@ -0,0 +1,11 @@
FROM python:3
MAINTAINER Irwan Fathurrahman <meomancer@gmail.com>
ADD requirements.txt /home/requirements.txt
RUN pip3 install -r /home/requirements.txt
ADD enrich.py /home/
WORKDIR /home
CMD ["python3", "-u", "/home/enrich.py"]

Wyświetl plik

@ -0,0 +1,570 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
/***************************************************************************
Docker-OSM Enrich
An enrich database of docker osm.
-------------------
begin : 2019-03-13
email : irwan at kartoza dot com
contributor : Irwan Fathurrahman
***************************************************************************/
/***************************************************************************
* *
* This program is free software; you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation; either version 2 of the License, or *
* (at your option) any later version. *
* *
***************************************************************************/
"""
import gzip
import xmltodict
import yaml
from xmltodict import OrderedDict
from dateutil import parser
from os import environ, listdir, mkdir
from os.path import join, exists, getsize
from sys import exit, stderr
from urllib import request
from psycopg2 import connect, OperationalError, ProgrammingError
from time import sleep
class Enrich(object):
mapping_type = {
'point': 'node',
'linestring': 'way',
'polygon': 'way'
}
enriched_column = {
'changeset_id': 'int',
'changeset_version': 'int',
'changeset_timestamp': 'datetime',
'changeset_user': 'string'
}
latest_diff_file = None
cache_folder = None
out_of_scope_osm_folder = None
def __init__(self):
# Default values which can be overwritten by environment variable.
self.default = {
'TIME': 120,
'POSTGRES_USER': 'docker',
'POSTGRES_PASS': 'docker',
'POSTGRES_DBNAME': 'gis',
'POSTGRES_HOST': 'db',
'POSTGRES_PORT': '5432',
'SETTINGS': 'settings',
'OSM_API_URL': 'https://api.openstreetmap.org/api/0.6/',
'IMPORT_DONE': 'import_done',
'CACHE': 'cache',
'MAX_DIFF_FILE_SIZE': 100000000,
'CACHE_MODIFY_CHECK': ''
}
self.mapping_file = None
self.mapping_database_schema = {}
self.postgis_uri = None
self.overwrite_environment()
self.check_settings()
def check_settings(self):
"""Perform various checking.
This will run when the container is starting. If an error occurs, the
container will stop.
"""
# Test files
try:
for f in listdir(self.default['SETTINGS']):
if f.endswith('.yml'):
self.mapping_file = join(self.default['SETTINGS'], f)
except FileNotFoundError:
pass
if not self.mapping_file:
self.error(
'Mapping file *.yml is missing in %s' % self.default['SETTINGS']
)
else:
self.info('Mapping: ' + self.mapping_file)
# In docker-compose, we should wait for the DB is ready.
self.check_mapping_file_data()
self.info('The checkup is OK.')
# enrich
cache_folder = self.default['CACHE']
if exists(cache_folder):
cache_folder = join(cache_folder, 'enrich')
if not exists(cache_folder):
mkdir(cache_folder)
# out_of_scope_osm
out_of_scope_osm_folder = join(
cache_folder, 'out_of_scope_osm')
if not exists(out_of_scope_osm_folder):
mkdir(out_of_scope_osm_folder)
self.out_of_scope_osm_folder = out_of_scope_osm_folder
self.cache_folder = cache_folder
# check using not found cache for modify
if self.default['CACHE_MODIFY_CHECK'].lower() == 'true':
self.default['CACHE_MODIFY_CHECK'] = True
else:
self.default['CACHE_MODIFY_CHECK'] = False
def get_cache_path(self):
return join(self.cache_folder, 'cache')
def get_cache_file(self):
""" Return path of cache file
return None if not found
"""
if self.cache_folder:
if exists(self.cache_folder):
cache_file = self.get_cache_path()
if exists(cache_file):
return cache_file
return None
def is_non_recognized_id(self, osm_type, osm_id):
""" Return if osm id and type is unrecognized id
"""
if not self.default['CACHE_MODIFY_CHECK']:
return False
if self.out_of_scope_osm_folder:
if exists(
join(self.out_of_scope_osm_folder,
'%s-%s' % (osm_type, osm_id))):
return True
return False
def get_or_create_non_recognized_id(self, osm_type, osm_id):
""" Create file as cache for non recognized id
"""
if not self.default['CACHE_MODIFY_CHECK']:
return
if self.out_of_scope_osm_folder:
filename = join(
self.out_of_scope_osm_folder,
'%s-%s' % (osm_type, osm_id))
if not exists(filename):
try:
f = open(filename, 'w+')
f.close()
except IOError:
self.info('%s can\'t be created' % filename)
def check_mapping_file_data(self):
"""Perform converting yaml data into json
that used for checking table on database
"""
self.info('Load Mapping file data.')
document = open(self.mapping_file, 'r')
mapping_data = yaml.load(document)
try:
for table, value in mapping_data['tables'].items():
try:
type = value['type']
try:
osm_type = self.mapping_type[type]
osm_id_column = None
osm_id_column_index = None
columns = ['id']
for index, column in enumerate(value['columns']):
columns.append(column['name'])
try:
if column['type'] == 'id':
osm_id_column = column['name']
osm_id_column_index = index
except KeyError:
pass
columns.extend(self.enriched_column.keys())
self.mapping_database_schema['osm_%s' % table] = {
'osm_type': osm_type,
'osm_id_columnn': osm_id_column,
'osm_id_columnn_index': osm_id_column_index,
'columns': columns
}
except KeyError:
self.info('Type %s is not yet recognized by enrich.' % type)
except KeyError:
self.info(
'Table %s doesn\'t has "type" attribute'
)
except KeyError:
self.error(
'Mapping file %s doesn\'t has "tables" attribute' % self.mapping_file
)
def create_connection(self):
return connect(
"dbname='%s' user='%s' host='%s' password='%s'" % (
self.default['POSTGRES_DBNAME'],
self.default['POSTGRES_USER'],
self.default['POSTGRES_HOST'],
self.default['POSTGRES_PASS']))
def check_database(self):
"""Test connection to PostGIS and create the URI."""
connection = self.create_connection()
cursor = connection.cursor()
try:
for table, table_data in self.mapping_database_schema.items():
new_columns_postgis = []
for enrich_key, enrich_type in self.enriched_column.items():
try:
cursor.execute('select %s from %s' % (enrich_key, table))
except ProgrammingError as e:
connection.rollback()
if enrich_type == 'int':
new_columns_postgis.append('ADD COLUMN %s INTEGER' % enrich_key)
elif enrich_type == 'string':
new_columns_postgis.append('ADD COLUMN %s VARCHAR' % enrich_key)
elif enrich_type == 'datetime':
new_columns_postgis.append('ADD COLUMN %s timestamp' % enrich_key)
if len(new_columns_postgis) > 0:
query = 'ALTER TABLE %s %s' % (table, ','.join(new_columns_postgis))
cursor.execute(query)
connection.commit()
connection.close()
return True
except (OperationalError, ProgrammingError):
connection.rollback()
connection.close()
return False
@staticmethod
def info(message):
print(message)
@staticmethod
def error(message):
print(stderr.write(message))
exit()
def overwrite_environment(self):
"""Overwrite default values from the environment."""
for key in list(environ.keys()):
if key in list(self.default.keys()):
self.default[key] = environ[key]
def check_data_on_dict(self, data, key):
""" return data from dict with key
return None if not found
"""
try:
return data[key]
except KeyError:
return None
def get_osm_enrich_new_data(self, from_osm, from_database):
""" Convert data from xml of osm into json
and check if from osm is newer than from database
:param from_osm: Data that got from osm
:type from_osm: dict
:param from_database: Data that in local database
:type from_database: dict
:return: Dictionary of new data that need to be inserted
:rtype: dict
"""
osm_id = self.check_data_on_dict(from_osm, '@id')
row = from_database
new_data = []
if osm_id and row:
allow_updated = False
osm_timestamp = self.check_data_on_dict(from_osm, '@timestamp')
osm_datetime = parser.parse(osm_timestamp).replace(tzinfo=None)
if not row['changeset_timestamp'] or row['changeset_timestamp'] < osm_datetime:
allow_updated = True
if allow_updated:
osm_version = self.check_data_on_dict(from_osm, '@version')
osm_changeset = self.check_data_on_dict(from_osm, '@changeset')
osm_user = self.check_data_on_dict(from_osm, '@user')
self.info('Update for %s' % osm_id)
new_data = {
'changeset_id': osm_changeset,
'changeset_timestamp': osm_datetime,
'changeset_version': osm_version,
'changeset_user': osm_user
}
return new_data
def update_enrich_into_database(self, table_name, osm_id_column, osm_id, new_data):
""" Update new data into data of osm id
:param table_name: Table source of rows
:type table_name: str
:param osm_id_column: Column name of osm_id
:type osm_id_column: str
:param osm_id: osm id of data
:type osm_id: str
:param new_data: new data that will be updated
:type new_data: Dict
"""
if not new_data:
return
sets = []
for field, value in new_data.items():
try:
value = value.replace('\'', '\'\'')
except (TypeError, AttributeError):
pass
sets.append('%s=\'%s\'' % (field, value))
connection = self.create_connection()
cursor = connection.cursor()
try:
query = 'UPDATE %s SET %s WHERE %s=%s' % (
table_name, ','.join(sets), osm_id_column, osm_id)
cursor.execute(query)
connection.commit()
except ProgrammingError as e:
connection.rollback()
self.info('%s' % e)
connection.close()
# THIS PROCESS BELOW IS FOR EMPTY CHANGESET
def update_osm_enrich_from_api_in_batch(
self, osm_ids, osm_type, row_batch, table_name, osm_id_column):
""" Get osm data from OSM API in Batch
:param osm_ids: osm id in list
:type osm_ids: list
:param osm_type: feature type of this osm
:type osm_type: str
:param osm_type: feature type of this osm
:type osm_type: str
:param row_batch: Row data from local database in dictionary
:type row_batch: Dict
:param table_name: Table source of rows
:type table_name: str
:param osm_id_column: Column name of osm_id
:type osm_id_column: str
"""
if len(osm_ids) == 0:
return
osm_type_on_url = osm_type + 's'
url = join(self.default['OSM_API_URL'], osm_type_on_url)
url += '?%s=%s' % (osm_type_on_url, ','.join(osm_ids))
self.info(url)
content = None
try:
raw_content = request.urlopen(url).read()
raw_content = xmltodict.parse(raw_content)
if type(raw_content['osm'][osm_type]) == list:
for osm in raw_content['osm'][osm_type]:
osm_id = self.check_data_on_dict(osm, '@id')
row = self.check_data_on_dict(row_batch, osm_id)
new_data = self.get_osm_enrich_new_data(
osm, row)
self.update_enrich_into_database(
table_name, osm_id_column, osm_id, new_data)
else:
osm_id = self.check_data_on_dict(
raw_content['osm'][osm_type], '@id')
row = self.check_data_on_dict(row_batch, osm_id)
new_data = self.get_osm_enrich_new_data(
raw_content['osm'][osm_type], row)
self.update_enrich_into_database(
table_name, osm_id_column, osm_id, new_data)
except Exception as e:
self.info('%s' % e)
return content
def process_empty_changeset_from_table(self, table_name, table_columns, osm_id_column, osm_type):
""" Processing all data from table
:param table_name: Table source
:type table_name: str
:param table_columns: columns of tables
:type table_columns: list
:param osm_type: feature type of this osm
:type osm_type: str
:param osm_type: feature type of this osm
:type osm_type: str
:param osm_id_column: Column name of osm_id
:type osm_id_column: str
"""
# noinspection PyUnboundLocalVariable
connection = self.create_connection()
cursor = connection.cursor()
row_batch = {}
osm_ids = []
try:
cursor.execute(
'select * from %s WHERE changeset_timestamp IS NULL AND osm_id IS NOT NULL ORDER BY osm_id' % table_name)
row = True
while row:
# do something with row
row = cursor.fetchone()
if row:
row = dict(zip(table_columns, row))
row_batch['%s' % row[osm_id_column]] = row
osm_ids.append('%s' % row[osm_id_column])
if len(osm_ids) == 30:
self.update_osm_enrich_from_api_in_batch(
osm_ids, osm_type, row_batch, table_name, osm_id_column)
row_batch = {}
osm_ids = []
self.update_osm_enrich_from_api_in_batch(
osm_ids, osm_type, row_batch, table_name, osm_id_column)
except ProgrammingError as e:
connection.rollback()
self.info('%s' % e)
def enrich_empty_changeset(self):
"""Enrich database that has empty changeset by using OSM API URL
"""
self.info('Enrich Database with empty changeset')
for table, table_data in self.mapping_database_schema.items():
osm_id_columnn = table_data['osm_id_columnn']
osm_type = table_data['osm_type']
columns = table_data['columns']
if osm_id_columnn is not None:
self.info('Checking data from table %s' % table)
self.process_empty_changeset_from_table(
table, columns, osm_id_columnn, osm_type)
else:
self.info('Does not know osm_id column for %s.' % table)
# THIS PROCESS BELOW IS FOR CHECKING DIFF FILES
def enrich_database_from_osm_data(self, osm_data, osm_data_type):
""" Convert data from xml of osm into json
and check if from osm is newer than from database
:param from_osm: Data that got from osm
:type from_osm: dict
:param from_database: Data that in local database
:type from_database: dict
:return: Dictionary of new data that need to be inserted
:rtype: dict
"""
osm_id = self.check_data_on_dict(
osm_data, '@id')
for table, table_data in self.mapping_database_schema.items():
if osm_data_type == table_data['osm_type']:
# check if this osm is not found on database
if self.is_non_recognized_id(osm_data_type, osm_id):
continue
connection = self.create_connection()
cursor = connection.cursor()
try:
cursor.execute('select * from %s WHERE %s=%s' % (
table, table_data['osm_id_columnn'], osm_id))
row = cursor.fetchone()
if row:
row = dict(zip(table_data['columns'], row))
new_data = self.get_osm_enrich_new_data(osm_data, row)
self.update_enrich_into_database(
table, table_data['osm_id_columnn'], osm_id, new_data)
else:
# if this id is not found add in cache
self.get_or_create_non_recognized_id(osm_data_type, osm_id)
except Exception as e:
self.info('error when processing %s: %s' % (osm_id, e))
connection.close()
def enrich_database_from_diff_file(self):
# check latest diff file
if self.get_cache_file():
self.latest_diff_file = open(self.get_cache_file(), "r").read()
# get list diff file
next_latest_diff_file = None
target_folder = self.default['IMPORT_DONE']
self.info('Enrich Database with diff file in %s' % self.default['IMPORT_DONE'])
if not exists(target_folder):
self.info('Folder %s is not ready yet' % target_folder)
return
for filename in sorted(listdir(target_folder)):
try:
if filename.endswith('.gz'):
if not self.latest_diff_file or self.latest_diff_file < filename:
self.info('Processing %s' % filename)
# if it is newest file
# process for getting this
gzip_file = join(target_folder, filename)
if getsize(gzip_file) > self.default['MAX_DIFF_FILE_SIZE']:
self.info('File is too big, skip it')
continue
f = gzip.open(gzip_file, 'rb')
file_content = f.read()
f.close()
raw_content = xmltodict.parse(file_content)
try:
modify_list = raw_content['osmChange']['modify']
for list in modify_list:
for key, value in list.items():
if type(value) != OrderedDict:
for osm_data in value:
self.enrich_database_from_osm_data(
osm_data, key
)
else:
self.enrich_database_from_osm_data(
value, key
)
except KeyError:
self.info('%s can not be opened' % filename)
if not next_latest_diff_file or next_latest_diff_file < filename:
next_latest_diff_file = filename
except Exception as e:
self.info('Error when processing %s : %s' % (filename, e))
if next_latest_diff_file:
try:
cache_file = self.get_cache_path()
f = open(cache_file, 'w')
f.write(next_latest_diff_file)
f.close()
except IOError:
self.info('cache file can\'t be created')
def run(self):
"""First checker."""
while True:
self.info('Run enrich process')
if self.check_database():
self.enrich_empty_changeset()
self.enrich_database_from_diff_file()
else:
self.info('Database is not ready')
# sleep looping
self.info('sleeping for %s' % self.default['TIME'])
sleep(float(self.default['TIME']))
if __name__ == '__main__':
enrich = Enrich()
enrich.run()

Wyświetl plik

@ -0,0 +1,12 @@
# Docker-osmenrich
Docker osm-enrich is the extension for docker osm to get the changeset of the osm data.
It will get the data from osm API and also get the update data from files that generated from docker-osmupdate
- data is new (changeset is null) : get from docker osm
- data is exist but need to check the recent changeset : get data from file generated from osmupdate, update into database
osmenrich will create new fields which are:
- changeset_id
- changeset_timestamp
- changeset_version
- changeset_user

Wyświetl plik

@ -0,0 +1,4 @@
psycopg2-binary
python-dateutil
pyyaml
xmltodict