docker-osm/docker-osmenrich/enrich.py

588 wiersze
23 KiB
Python
Czysty Zwykły widok Historia

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
/***************************************************************************
Docker-OSM Enrich
An enrich database of docker osm.
-------------------
begin : 2019-03-13
email : irwan at kartoza dot com
contributor : Irwan Fathurrahman
***************************************************************************/
/***************************************************************************
* *
* This program is free software; you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation; either version 2 of the License, or *
* (at your option) any later version. *
* *
***************************************************************************/
"""
import gzip
from os import environ, listdir, mkdir
from os.path import join, exists, getsize
from sys import exit, stderr
2021-02-09 15:47:37 +00:00
from time import sleep
from urllib import request
2021-02-09 15:47:37 +00:00
import xmltodict
import yaml
from dateutil import parser
from psycopg2 import connect, OperationalError, ProgrammingError
2021-02-09 15:47:37 +00:00
from xmltodict import OrderedDict
class Enrich(object):
mapping_type = {
'point': 'node',
'linestring': 'way',
'polygon': 'way'
}
enriched_column = {
'changeset_id': 'int',
'changeset_version': 'int',
'changeset_timestamp': 'datetime',
'changeset_user': 'string'
}
latest_diff_file = None
cache_folder = None
out_of_scope_osm_folder = None
def __init__(self):
# Default values which can be overwritten by environment variable.
self.default = {
'TIME': 120,
'POSTGRES_USER': 'docker',
'POSTGRES_PASS': 'docker',
'POSTGRES_DBNAME': 'gis',
'POSTGRES_HOST': 'db',
'POSTGRES_PORT': '5432',
'SETTINGS': 'settings',
'OSM_API_URL': 'https://api.openstreetmap.org/api/0.6/',
'IMPORT_DONE': 'import_done',
'CACHE': 'cache',
'MAX_DIFF_FILE_SIZE': 100000000,
'CACHE_MODIFY_CHECK': ''
}
self.mapping_file = None
self.mapping_database_schema = {}
self.postgis_uri = None
self.overwrite_environment()
self.check_settings()
def check_settings(self):
"""Perform various checking.
This will run when the container is starting. If an error occurs, the
container will stop.
"""
# Test files
try:
for f in listdir(self.default['SETTINGS']):
if f.endswith('.yml'):
self.mapping_file = join(self.default['SETTINGS'], f)
except FileNotFoundError:
pass
if not self.mapping_file:
self.error(
'Mapping file *.yml is missing in %s' % self.default['SETTINGS']
)
else:
self.info('Mapping: ' + self.mapping_file)
# In docker-compose, we should wait for the DB is ready.
self.check_mapping_file_data()
self.info('The checkup is OK.')
# enrich
cache_folder = self.default['CACHE']
if exists(cache_folder):
cache_folder = join(cache_folder, 'enrich')
if not exists(cache_folder):
mkdir(cache_folder)
# out_of_scope_osm
out_of_scope_osm_folder = join(
cache_folder, 'out_of_scope_osm')
if not exists(out_of_scope_osm_folder):
mkdir(out_of_scope_osm_folder)
self.out_of_scope_osm_folder = out_of_scope_osm_folder
self.cache_folder = cache_folder
# check using not found cache for modify
if self.default['CACHE_MODIFY_CHECK'].lower() == 'true':
self.default['CACHE_MODIFY_CHECK'] = True
else:
self.default['CACHE_MODIFY_CHECK'] = False
def get_cache_path(self):
return join(self.cache_folder, 'cache')
def get_cache_file(self):
""" Return path of cache file
return None if not found
"""
if self.cache_folder:
if exists(self.cache_folder):
cache_file = self.get_cache_path()
if exists(cache_file):
return cache_file
return None
def is_non_recognized_id(self, osm_type, osm_id):
""" Return if osm id and type is unrecognized id
"""
if not self.default['CACHE_MODIFY_CHECK']:
return False
if self.out_of_scope_osm_folder:
if exists(
join(self.out_of_scope_osm_folder,
'%s-%s' % (osm_type, osm_id))):
return True
return False
def get_or_create_non_recognized_id(self, osm_type, osm_id):
""" Create file as cache for non recognized id
"""
if not self.default['CACHE_MODIFY_CHECK']:
return
if self.out_of_scope_osm_folder:
filename = join(
self.out_of_scope_osm_folder,
'%s-%s' % (osm_type, osm_id))
if not exists(filename):
try:
f = open(filename, 'w+')
f.close()
except IOError:
self.info('%s can\'t be created' % filename)
def check_mapping_file_data(self):
"""Perform converting yaml data into json
that used for checking table on database
"""
self.info('Load Mapping file data.')
document = open(self.mapping_file, 'r')
mapping_data = yaml.load(document)
try:
for table, value in mapping_data['tables'].items():
try:
type = value['type']
try:
osm_type = self.mapping_type[type]
osm_id_column = None
osm_id_column_index = None
columns = ['id']
for index, column in enumerate(value['columns']):
columns.append(column['name'])
try:
if column['type'] == 'id':
osm_id_column = column['name']
osm_id_column_index = index
except KeyError:
pass
columns.extend(self.enriched_column.keys())
self.mapping_database_schema['osm_%s' % table] = {
'osm_type': osm_type,
'osm_id_columnn': osm_id_column,
'osm_id_columnn_index': osm_id_column_index,
'columns': columns
}
except KeyError:
self.info('Type %s is not yet recognized by enrich.' % type)
except KeyError:
self.info(
'Table %s doesn\'t has "type" attribute'
)
except KeyError:
self.error(
'Mapping file %s doesn\'t has "tables" attribute' % self.mapping_file
)
def create_connection(self):
return connect(
"dbname='%s' user='%s' host='%s' password='%s'" % (
self.default['POSTGRES_DBNAME'],
self.default['POSTGRES_USER'],
self.default['POSTGRES_HOST'],
self.default['POSTGRES_PASS']))
def check_database(self):
"""Test connection to PostGIS and create the URI."""
connection = self.create_connection()
cursor = connection.cursor()
try:
for table, table_data in self.mapping_database_schema.items():
new_columns_postgis = []
for enrich_key, enrich_type in self.enriched_column.items():
2021-02-09 15:47:37 +00:00
check_column = ''' SELECT EXISTS (SELECT 1 FROM information_schema.columns
WHERE table_name='%s' and column_name='%s'); ''' % (
table, enrich_key)
cursor.execute(check_column)
column_existence = cursor.fetchone()[0]
if column_existence != 1:
if enrich_type == 'int':
2021-02-09 15:47:37 +00:00
new_columns_postgis.append('ADD COLUMN IF NOT EXISTS %s NUMERIC' % enrich_key)
elif enrich_type == 'string':
2021-02-09 15:47:37 +00:00
new_columns_postgis.append('ADD COLUMN IF NOT EXISTS %s CHARACTER VARYING (255)' % enrich_key)
elif enrich_type == 'datetime':
2021-02-09 15:47:37 +00:00
new_columns_postgis.append('ADD COLUMN IF NOT EXISTS %s TIMESTAMPTZ' % enrich_key)
if len(new_columns_postgis) > 0:
2021-02-09 15:47:37 +00:00
query = 'ALTER TABLE public."%s" %s;' % (table, ','.join(new_columns_postgis))
cursor.execute(query)
connection.commit()
connection.close()
return True
except (OperationalError, ProgrammingError):
connection.rollback()
connection.close()
return False
@staticmethod
def info(message):
print(message)
@staticmethod
def error(message):
print(stderr.write(message))
exit()
def overwrite_environment(self):
"""Overwrite default values from the environment."""
for key in list(environ.keys()):
if key in list(self.default.keys()):
self.default[key] = environ[key]
def check_data_on_dict(self, data, key):
""" return data from dict with key
return None if not found
"""
try:
return data[key]
except KeyError:
return None
def get_osm_enrich_new_data(self, from_osm, from_database):
""" Convert data from xml of osm into json
and check if from osm is newer than from database
:param from_osm: Data that got from osm
:type from_osm: dict
:param from_database: Data that in local database
:type from_database: dict
:return: Dictionary of new data that need to be inserted
:rtype: dict
"""
osm_id = self.check_data_on_dict(from_osm, '@id')
row = from_database
new_data = []
if osm_id and row:
allow_updated = False
osm_timestamp = self.check_data_on_dict(from_osm, '@timestamp')
osm_datetime = parser.parse(osm_timestamp).replace(tzinfo=None)
if not row['changeset_timestamp'] or row['changeset_timestamp'] < osm_datetime:
allow_updated = True
if allow_updated:
osm_version = self.check_data_on_dict(from_osm, '@version')
osm_changeset = self.check_data_on_dict(from_osm, '@changeset')
osm_user = self.check_data_on_dict(from_osm, '@user')
self.info('Update for %s' % osm_id)
new_data = {
'changeset_id': osm_changeset,
'changeset_timestamp': osm_datetime,
'changeset_version': osm_version,
'changeset_user': osm_user
}
return new_data
def update_enrich_into_database(self, table_name, osm_id_column, osm_id, new_data):
""" Update new data into data of osm id
:param table_name: Table source of rows
:type table_name: str
:param osm_id_column: Column name of osm_id
:type osm_id_column: str
:param osm_id: osm id of data
:type osm_id: str
:param new_data: new data that will be updated
:type new_data: Dict
"""
if not new_data:
return
sets = []
for field, value in new_data.items():
try:
value = value.replace('\'', '\'\'')
except (TypeError, AttributeError):
pass
sets.append('%s=\'%s\'' % (field, value))
connection = self.create_connection()
cursor = connection.cursor()
try:
query = 'UPDATE %s SET %s WHERE %s=%s' % (
table_name, ','.join(sets), osm_id_column, osm_id)
cursor.execute(query)
connection.commit()
except ProgrammingError as e:
connection.rollback()
self.info('%s' % e)
connection.close()
# THIS PROCESS BELOW IS FOR EMPTY CHANGESET
def update_osm_enrich_from_api_in_batch(
self, osm_ids, osm_type, row_batch, table_name, osm_id_column):
""" Get osm data from OSM API in Batch
:param osm_ids: osm id in list
:type osm_ids: list
:param osm_type: feature type of this osm
:type osm_type: str
:param osm_type: feature type of this osm
:type osm_type: str
:param row_batch: Row data from local database in dictionary
:type row_batch: Dict
:param table_name: Table source of rows
:type table_name: str
:param osm_id_column: Column name of osm_id
:type osm_id_column: str
"""
if len(osm_ids) == 0:
return
osm_type_on_url = osm_type + 's'
url = join(self.default['OSM_API_URL'], osm_type_on_url)
url += '?%s=%s' % (osm_type_on_url, ','.join(osm_ids))
self.info(url)
content = None
try:
raw_content = request.urlopen(url).read()
raw_content = xmltodict.parse(raw_content)
if type(raw_content['osm'][osm_type]) == list:
for osm in raw_content['osm'][osm_type]:
osm_id = self.check_data_on_dict(osm, '@id')
row = self.check_data_on_dict(row_batch, osm_id)
new_data = self.get_osm_enrich_new_data(
osm, row)
self.update_enrich_into_database(
table_name, osm_id_column, osm_id, new_data)
else:
osm_id = self.check_data_on_dict(
raw_content['osm'][osm_type], '@id')
row = self.check_data_on_dict(row_batch, osm_id)
new_data = self.get_osm_enrich_new_data(
raw_content['osm'][osm_type], row)
self.update_enrich_into_database(
table_name, osm_id_column, osm_id, new_data)
except Exception as e:
self.info('%s' % e)
return content
def process_empty_changeset_from_table(self, table_name, table_columns, osm_id_column, osm_type):
""" Processing all data from table
:param table_name: Table source
:type table_name: str
:param table_columns: columns of tables
:type table_columns: list
:param osm_type: feature type of this osm
:type osm_type: str
:param osm_type: feature type of this osm
:type osm_type: str
:param osm_id_column: Column name of osm_id
:type osm_id_column: str
"""
# noinspection PyUnboundLocalVariable
connection = self.create_connection()
cursor = connection.cursor()
row_batch = {}
osm_ids = []
try:
2021-02-09 15:47:37 +00:00
check_sql = ''' select * from "%s" WHERE "changeset_timestamp" IS NULL AND "osm_id" IS NOT NULL ORDER BY "osm_id" ''' % table_name
cursor.execute(check_sql)
row = True
while row:
# do something with row
row = cursor.fetchone()
if row:
row = dict(zip(table_columns, row))
row_batch['%s' % row[osm_id_column]] = row
osm_ids.append('%s' % row[osm_id_column])
if len(osm_ids) == 30:
self.update_osm_enrich_from_api_in_batch(
osm_ids, osm_type, row_batch, table_name, osm_id_column)
row_batch = {}
osm_ids = []
self.update_osm_enrich_from_api_in_batch(
osm_ids, osm_type, row_batch, table_name, osm_id_column)
except ProgrammingError as e:
connection.rollback()
self.info('%s' % e)
def enrich_empty_changeset(self):
"""Enrich database that has empty changeset by using OSM API URL
"""
self.info('Enrich Database with empty changeset')
for table, table_data in self.mapping_database_schema.items():
osm_id_columnn = table_data['osm_id_columnn']
osm_type = table_data['osm_type']
columns = table_data['columns']
if osm_id_columnn is not None:
self.info('Checking data from table %s' % table)
self.process_empty_changeset_from_table(
table, columns, osm_id_columnn, osm_type)
else:
self.info('Does not know osm_id column for %s.' % table)
# THIS PROCESS BELOW IS FOR CHECKING DIFF FILES
def enrich_database_from_osm_data(self, osm_data, osm_data_type):
""" Convert data from xml of osm into json
and check if from osm is newer than from database
:param from_osm: Data that got from osm
:type from_osm: dict
:param from_database: Data that in local database
:type from_database: dict
:return: Dictionary of new data that need to be inserted
:rtype: dict
"""
osm_id = self.check_data_on_dict(
osm_data, '@id')
for table, table_data in self.mapping_database_schema.items():
if osm_data_type == table_data['osm_type']:
# check if this osm is not found on database
if self.is_non_recognized_id(osm_data_type, osm_id):
continue
connection = self.create_connection()
cursor = connection.cursor()
try:
2021-02-09 15:47:37 +00:00
validate_sql = ''' select * from "%s" WHERE "%s"=%s ''' % (
table, table_data['osm_id_columnn'], osm_id)
cursor.execute(validate_sql)
row = cursor.fetchone()
if row:
row = dict(zip(table_data['columns'], row))
new_data = self.get_osm_enrich_new_data(osm_data, row)
self.update_enrich_into_database(
table, table_data['osm_id_columnn'], osm_id, new_data)
else:
# if this id is not found add in cache
self.get_or_create_non_recognized_id(osm_data_type, osm_id)
except Exception as e:
self.info('error when processing %s: %s' % (osm_id, e))
connection.close()
def enrich_database_from_diff_file(self):
# check latest diff file
if self.get_cache_file():
self.latest_diff_file = open(self.get_cache_file(), "r").read()
# get list diff file
next_latest_diff_file = None
target_folder = self.default['IMPORT_DONE']
self.info('Enrich Database with diff file in %s' % self.default['IMPORT_DONE'])
if not exists(target_folder):
self.info('Folder %s is not ready yet' % target_folder)
return
for filename in sorted(listdir(target_folder)):
try:
if filename.endswith('.gz'):
if not self.latest_diff_file or self.latest_diff_file < filename:
self.info('Processing %s' % filename)
# if it is newest file
# process for getting this
gzip_file = join(target_folder, filename)
if getsize(gzip_file) > self.default['MAX_DIFF_FILE_SIZE']:
self.info('File is too big, skip it')
continue
f = gzip.open(gzip_file, 'rb')
file_content = f.read()
f.close()
raw_content = xmltodict.parse(file_content)
try:
modify_list = raw_content['osmChange']['modify']
for list in modify_list:
for key, value in list.items():
if type(value) != OrderedDict:
for osm_data in value:
self.enrich_database_from_osm_data(
osm_data, key
)
else:
self.enrich_database_from_osm_data(
value, key
)
except KeyError:
self.info('%s can not be opened' % filename)
if not next_latest_diff_file or next_latest_diff_file < filename:
next_latest_diff_file = filename
except Exception as e:
self.info('Error when processing %s : %s' % (filename, e))
if next_latest_diff_file:
try:
cache_file = self.get_cache_path()
f = open(cache_file, 'w')
f.write(next_latest_diff_file)
f.close()
except IOError:
self.info('cache file can\'t be created')
2021-02-09 15:47:37 +00:00
def locate_table(self, name):
"""Check for tables in the DB table exists in the DB"""
connection = self.create_connection()
cursor = connection.cursor()
sql = """ SELECT EXISTS (SELECT 1 AS result from information_schema.tables where table_name like 'TEMP_TABLE'); """
cursor.execute(sql.replace('TEMP_TABLE', '%s' % name))
# noinspection PyUnboundLocalVariable
return cursor.fetchone()[0]
def run(self):
"""First checker."""
while True:
self.info('Run enrich process')
2021-02-09 15:47:37 +00:00
osm_tables = self.locate_table('osm_%')
if osm_tables != 1:
self.info('Imposm is still running, wait a while and try again')
else:
2021-02-09 15:47:37 +00:00
if self.check_database():
self.enrich_empty_changeset()
self.enrich_database_from_diff_file()
# sleep looping
self.info('sleeping for %s' % self.default['TIME'])
sleep(float(self.default['TIME']))
if __name__ == '__main__':
enrich = Enrich()
enrich.run()