refactoring osmupdate

pull/28/head
Etienne Trimaille 2015-12-29 14:05:57 +01:00
rodzic 4c1899854f
commit c3692553c2
1 zmienionych plików z 113 dodań i 91 usunięć

Wyświetl plik

@ -27,113 +27,135 @@ from datetime import datetime
from time import sleep from time import sleep
from sys import stderr from sys import stderr
# Default values which can be overwritten.
default = {
'MAX_DAYS': '100',
'DIFF': 'sporadic',
'MAX_MERGE': '7',
'COMPRESSION_LEVEL': '1',
'BASE_URL': 'http://planet.openstreetmap.org/replication/',
'IMPORT_QUEUE': 'import_queue',
'IMPORT_DONE': 'import_done',
'SETTINGS': 'settings',
'TIME': 120,
}
for key in environ.keys(): class Downloader(object):
if key in default.keys():
default[key] = environ[key]
# Folders def __init__(self):
folders = ['IMPORT_QUEUE', 'IMPORT_DONE', 'SETTINGS'] # Default values which can be overwritten.
for folder in folders: self.default = {
if not isabs(default[folder]): 'MAX_DAYS': '100',
# Get the absolute path. 'DIFF': 'sporadic',
default[folder] = abspath(default[folder]) 'MAX_MERGE': '7',
'COMPRESSION_LEVEL': '1',
'BASE_URL': 'http://planet.openstreetmap.org/replication/',
'IMPORT_QUEUE': 'import_queue',
'IMPORT_DONE': 'import_done',
'SETTINGS': 'settings',
'TIME': 120,
}
self.osm_file = None
# Test the folder @staticmethod
if not exists(default[folder]): def info(message):
print >> stderr, 'The folder %s does not exist.' % default[folder] print message
@staticmethod
def error(message):
print >> stderr, message
exit() exit()
# Test files def overwrite_environment(self):
osm_file = None """Overwrite default values from the environment."""
for f in listdir(default['SETTINGS']): for key in environ.keys():
if key in self.default.keys():
self.default[key] = environ[key]
if f.endswith('.pbf'): def check_settings(self):
osm_file = join(default['SETTINGS'], f) """Perform various checking."""
# Folders
folders = ['IMPORT_QUEUE', 'IMPORT_DONE', 'SETTINGS']
for folder in folders:
if not isabs(self.default[folder]):
# Get the absolute path.
self.default[folder] = abspath(self.default[folder])
""" # Test the folder
# Todo : need fix custom URL and sporadic diff : daily, hourly and minutely if not exists(self.default[folder]):
if f == 'custom_url_diff.txt': msg = 'The folder %s does not exist.' % self.default[folder]
with open(join(default['SETTINGS'], f), 'r') as content_file: self.error(msg)
default['BASE_URL'] = content_file.read()
"""
if not osm_file: # Test files
print >> stderr, 'OSM file *.osm.pbf is missing in %s' % default['SETTINGS'] for f in listdir(self.default['SETTINGS']):
exit()
# In docker-compose, we should wait for the DB is ready. if f.endswith('.pbf'):
print 'The checkup is OK. The container will continue soon, after the database.' self.osm_file = join(self.default['SETTINGS'], f)
sleep(45)
# Finally launch the listening process. if not self.osm_file:
while True: msg = 'OSM file *.osm.pbf is missing in %s' % self.default['SETTINGS']
# Check if diff to be imported is empty. If not, take the latest diff. self.error(msg)
diff_to_be_imported = sorted(listdir(default['IMPORT_QUEUE']))
if len(diff_to_be_imported): self.info('The checkup is OK. The container will continue soon, after the database.')
file_name = diff_to_be_imported[-1].split('.')[0] sleep(45)
timestamp = file_name.split('->-')[1]
print 'Timestamp from the latest not imported diff : %s' % timestamp def _check_latest_timestamp(self):
else: """Fetch the latest timestamp."""
# Check if imported diff is empty. If not, take the latest diff. # Check if diff to be imported is empty. If not, take the latest diff.
imported_diff = sorted(listdir(default['IMPORT_DONE'])) diff_to_be_imported = sorted(listdir(self.default['IMPORT_QUEUE']))
if len(imported_diff): if len(diff_to_be_imported):
file_name = imported_diff[-1].split('.')[0] file_name = diff_to_be_imported[-1].split('.')[0]
timestamp = file_name.split('->-')[1] timestamp = file_name.split('->-')[1]
print 'Timestamp from the latest imported diff : %s' % timestamp self.info('Timestamp from the latest not imported diff : %s' % timestamp)
else: else:
# Take the timestamp from original file. # Check if imported diff is empty. If not, take the latest diff.
command = ['osmconvert', osm_file, '--out-timestamp'] imported_diff = sorted(listdir(self.default['IMPORT_DONE']))
processus = Popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE) if len(imported_diff):
timestamp, err = processus.communicate() file_name = imported_diff[-1].split('.')[0]
timestamp = file_name.split('->-')[1]
self.info('Timestamp from the latest imported diff : %s' % timestamp)
# Remove new line else:
timestamp = timestamp.strip() # Take the timestamp from original file.
command = ['osmconvert', self.osm_file, '--out-timestamp']
processus = Popen(
command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
timestamp, err = processus.communicate()
print 'Timestamp from the original state file : %s' % timestamp # Remove new line
timestamp = timestamp.strip()
# Removing some \ in the timestamp. self.info('Timestamp from the original state file : %s' % timestamp)
timestamp = timestamp.replace('\\', '')
# Save time # Removing some \ in the timestamp.
current_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ') timestamp = timestamp.replace('\\', '')
print 'Old time : %s' % timestamp return timestamp
print 'Current time : %s' % current_time
# Destination def download(self):
file_name = '%s->-%s.osc.gz' % (timestamp, current_time) """Infinite loop to download diff files on a regular interval."""
file_path = join(default['IMPORT_QUEUE'], file_name) while True:
timestamp = self._check_latest_timestamp()
# Command # Save time
command = ['osmupdate', '-v'] current_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
command += ['--max-days=' + default['MAX_DAYS']] self.info('Old time : %s' % timestamp)
command += [default['DIFF']] self.info('Current time : %s' % current_time)
command += ['--max-merge=' + default['MAX_MERGE']]
command += ['--compression-level=' + default['COMPRESSION_LEVEL']]
command += ['--base-url=' + default['BASE_URL']]
command.append(timestamp)
command.append(file_path)
print ' '.join(command) # Destination
if call(command) != 0: file_name = '%s->-%s.osc.gz' % (timestamp, current_time)
print >> stderr, 'An error occured in osmupdate. Let\'s try again.' file_path = join(self.default['IMPORT_QUEUE'], file_name)
# Sleep less.
print 'Sleeping for 2 seconds.' # Command
sleep(2.0) command = ['osmupdate', '-v']
else: command += ['--max-days=' + self.default['MAX_DAYS']]
# Everything was fine, let's sleeping. command += [self.default['DIFF']]
print 'Sleeping for %s seconds.' % default['TIME'] command += ['--max-merge=' + self.default['MAX_MERGE']]
sleep(float(default['TIME'])) command += ['--compression-level=' + self.default['COMPRESSION_LEVEL']]
command += ['--base-url=' + self.default['BASE_URL']]
command.append(timestamp)
command.append(file_path)
self.info(' '.join(command))
if call(command) != 0:
self.info('An error occured in osmupdate. Let\'s try again.')
# Sleep less.
self.info('Sleeping for 2 seconds.')
sleep(2.0)
else:
# Everything was fine, let's sleeping.
self.info('Sleeping for %s seconds.' % self.default['TIME'])
sleep(float(self.default['TIME']))
if __name__ == '__main__':
downloader = Downloader()
downloader.overwrite_environment()
downloader.check_settings()
downloader.download()