From 06243fbc10c3e70bcf99ec920a5d57b4fced6f06 Mon Sep 17 00:00:00 2001 From: lukasmartinelli Date: Fri, 6 May 2016 10:57:56 +0200 Subject: [PATCH 01/11] Avoid invalid polygons in generated tables --- src/import-osm/mapping.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/import-osm/mapping.yml b/src/import-osm/mapping.yml index b73f228..0273c64 100644 --- a/src/import-osm/mapping.yml +++ b/src/import-osm/mapping.yml @@ -321,7 +321,7 @@ tables: - name: id type: id - name: geometry - type: geometry + type: validated_geometry - name: timestamp type: pbf_timestamp - name: type @@ -680,7 +680,7 @@ tables: - name: id type: id - name: geometry - type: geometry + type: validated_geometry - name: timestamp type: pbf_timestamp - key: name From 59ecf843fc9292567f5044e8ec0d663c134cc388 Mon Sep 17 00:00:00 2001 From: lukasmartinelli Date: Fri, 6 May 2016 10:58:17 +0200 Subject: [PATCH 02/11] Deal with non existent delete tables --- src/import-sql/triggers.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/import-sql/triggers.sql b/src/import-sql/triggers.sql index dde91a6..bab1340 100644 --- a/src/import-sql/triggers.sql +++ b/src/import-sql/triggers.sql @@ -150,7 +150,7 @@ BEGIN UNION SELECT * FROM osm_tables_delete LOOP - EXECUTE format('DROP TABLE %I CASCADE', t.table_name); + EXECUTE format('DROP TABLE IF EXISTS %I CASCADE', t.table_name); END LOOP; END; $$ language plpgsql; From a0a40430b01b2a48937531c2f0a36cf4f641f3c5 Mon Sep 17 00:00:00 2001 From: lukasmartinelli Date: Fri, 6 May 2016 11:46:32 +0200 Subject: [PATCH 03/11] Clip large polygons on high zoom levels --- osm2vectortiles.tm2source/data.yml | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/osm2vectortiles.tm2source/data.yml b/osm2vectortiles.tm2source/data.yml index f02b9c9..13f1cbd 100644 --- a/osm2vectortiles.tm2source/data.yml +++ b/osm2vectortiles.tm2source/data.yml @@ -41,7 +41,13 @@ Layer: WHERE geometry && !bbox! GROUP BY type UNION ALL - SELECT osm_ids2mbid(osm_id, true) AS osm_id, geometry, landuse_class(type) AS class, type + SELECT + osm_ids2mbid(osm_id, true) AS osm_id, + CASE WHEN ST_Area(geometry) > 10000000 AND z(!scale_denominator!) >= 12 + THEN ST_Intersection(ST_MakeValid(geometry), !bbox!) + ELSE geometry + END AS geometry, + landuse_class(type) AS class, type FROM ( SELECT osm_id, geometry, type FROM landuse_z9 @@ -133,7 +139,12 @@ Layer: srid: '' table: |- ( - SELECT osm_ids2mbid(osm_id, true) AS osm_id, geometry + SELECT + osm_ids2mbid(osm_id, true) AS osm_id, + CASE WHEN ST_Area(geometry) > 10000000 AND z(!scale_denominator!) >= 12 AND osm_id <> 0 + THEN ST_Intersection(ST_MakeValid(geometry), !bbox!) + ELSE geometry + END AS geometry FROM ( SELECT osm_id, geometry FROM water_z0 @@ -290,7 +301,13 @@ Layer: srid: '' table: |- ( - SELECT osm_ids2mbid(osm_id, true) AS osm_id, geometry, landuse_overlay_class(type) AS class, type + SELECT + osm_ids2mbid(osm_id, true) AS osm_id, + CASE WHEN ST_Area(geometry) > 10000000 AND z(!scale_denominator!) >= 12 + THEN ST_Intersection(ST_MakeValid(geometry), !bbox!) + ELSE geometry + END AS geometry, + landuse_overlay_class(type) AS class, type FROM ( SELECT osm_id, geometry, type FROM landuse_overlay_z5 WHERE z(!scale_denominator!) = 5 From 1d09caad2f320b4ef203efcab19f1e3f5835272b Mon Sep 17 00:00:00 2001 From: lukasmartinelli Date: Fri, 6 May 2016 14:39:21 +0200 Subject: [PATCH 04/11] Clip polygons on all zoom levels --- osm2vectortiles.tm2source/data.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/osm2vectortiles.tm2source/data.yml b/osm2vectortiles.tm2source/data.yml index 13f1cbd..dfb15f9 100644 --- a/osm2vectortiles.tm2source/data.yml +++ b/osm2vectortiles.tm2source/data.yml @@ -43,7 +43,7 @@ Layer: UNION ALL SELECT osm_ids2mbid(osm_id, true) AS osm_id, - CASE WHEN ST_Area(geometry) > 10000000 AND z(!scale_denominator!) >= 12 + CASE WHEN ST_Area(geometry) > 10000000 THEN ST_Intersection(ST_MakeValid(geometry), !bbox!) ELSE geometry END AS geometry, @@ -141,7 +141,7 @@ Layer: ( SELECT osm_ids2mbid(osm_id, true) AS osm_id, - CASE WHEN ST_Area(geometry) > 10000000 AND z(!scale_denominator!) >= 12 AND osm_id <> 0 + CASE WHEN ST_Area(geometry) > 10000000 AND osm_id <> 0 THEN ST_Intersection(ST_MakeValid(geometry), !bbox!) ELSE geometry END AS geometry @@ -303,7 +303,7 @@ Layer: ( SELECT osm_ids2mbid(osm_id, true) AS osm_id, - CASE WHEN ST_Area(geometry) > 10000000 AND z(!scale_denominator!) >= 12 + CASE WHEN ST_Area(geometry) > 10000000 THEN ST_Intersection(ST_MakeValid(geometry), !bbox!) ELSE geometry END AS geometry, From 4010e0c1921e6534d01830add8690d746ac40e8c Mon Sep 17 00:00:00 2001 From: lukasmartinelli Date: Fri, 6 May 2016 14:48:04 +0200 Subject: [PATCH 05/11] Let jobs timeout and publish to failed queue --- src/export/export_remote.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/export/export_remote.py b/src/export/export_remote.py index 41b6866..1c81bdb 100644 --- a/src/export/export_remote.py +++ b/src/export/export_remote.py @@ -92,7 +92,7 @@ def render_pyramid_command(source, sink, bounds, min_zoom, max_zoom): '--minzoom', str(min_zoom), '--maxzoom', str(max_zoom), '--bounds={}'.format(bounds), - '--timeout=300000', + '--timeout=120000', '--slow=60000', source, sink ] @@ -103,7 +103,7 @@ def optimize_mbtiles(mbtiles_file, mask_level=8): def export_remote(tm2source, rabbitmq_url, queue_name, result_queue_name, - render_scheme, bucket_name): + failed_queue_name, render_scheme, bucket_name): host = os.getenv('AWS_S3_HOST', 'mock-s3') port = int(os.getenv('AWS_S3_PORT', 8080)) @@ -147,7 +147,14 @@ def export_remote(tm2source, rabbitmq_url, queue_name, result_queue_name, raise ValueError("Message must be either of type pyramid or list") start = time.time() - subprocess.check_call(tilelive_cmd) + + try: + subprocess.check_call(tilelive_cmd) + except subprocess.CalledProcessError: + durable_publish(channel, failed_queue_name, body=body) + channel.basic_ack(delivery_tag=method.delivery_tag) + return + end = time.time() print('Rendering time: {}'.format(humanize.naturaltime(end - start))) @@ -208,6 +215,7 @@ def main(args): args[''], args['--job-queue'], 'results', + 'failed-jobs', args['--render_scheme'], args['--bucket'], ) From 6587a1cfccf2f9df15fb8e907a45302eb53b5907 Mon Sep 17 00:00:00 2001 From: lukasmartinelli Date: Fri, 6 May 2016 14:49:19 +0200 Subject: [PATCH 06/11] Declare failed jobs queue --- src/export/export_remote.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/export/export_remote.py b/src/export/export_remote.py index 1c81bdb..eaf1823 100644 --- a/src/export/export_remote.py +++ b/src/export/export_remote.py @@ -190,6 +190,7 @@ def configure_rabbitmq(channel): return channel.queue_declare(queue=queue, durable=True) queue_declare('jobs') + queue_declare('failed-jobs') queue_declare('results') From 30f3e9518ae5b7dca4843b04795da19f020b0d81 Mon Sep 17 00:00:00 2001 From: lukasmartinelli Date: Fri, 6 May 2016 15:07:30 +0200 Subject: [PATCH 07/11] Let process panic if timeout happens after ACK --- src/export/export_remote.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/export/export_remote.py b/src/export/export_remote.py index eaf1823..f5f2f45 100644 --- a/src/export/export_remote.py +++ b/src/export/export_remote.py @@ -150,10 +150,10 @@ def export_remote(tm2source, rabbitmq_url, queue_name, result_queue_name, try: subprocess.check_call(tilelive_cmd) - except subprocess.CalledProcessError: + except subprocess.CalledProcessError as e: durable_publish(channel, failed_queue_name, body=body) channel.basic_ack(delivery_tag=method.delivery_tag) - return + raise e end = time.time() From 46a442ed15d6f43fa84684da3d1dc00d2abe40e7 Mon Sep 17 00:00:00 2001 From: lukasmartinelli Date: Fri, 6 May 2016 15:29:03 +0200 Subject: [PATCH 08/11] Fix try except block --- src/export/export_remote.py | 37 +++++++++++++++++-------------------- 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/src/export/export_remote.py b/src/export/export_remote.py index f5f2f45..374bf96 100644 --- a/src/export/export_remote.py +++ b/src/export/export_remote.py @@ -146,34 +146,31 @@ def export_remote(tm2source, rabbitmq_url, queue_name, result_queue_name, else: raise ValueError("Message must be either of type pyramid or list") - start = time.time() try: + start = time.time() subprocess.check_call(tilelive_cmd) + end = time.time() + + print('Rendering time: {}'.format(humanize.naturaltime(end - start))) + print('Optimize MBTiles file size') + optimize_mbtiles(mbtiles_file) + upload_mbtiles(bucket, mbtiles_file) + os.remove(mbtiles_file) + + print('Upload mbtiles {}'.format(mbtiles_file)) + + download_link = s3_url(host, port, bucket_name, mbtiles_file) + result_msg = create_result_message(task_id, download_link, msg) + + durable_publish(channel, result_queue_name, + body=json.dumps(result_msg)) + channel.basic_ack(delivery_tag=method.delivery_tag) except subprocess.CalledProcessError as e: durable_publish(channel, failed_queue_name, body=body) channel.basic_ack(delivery_tag=method.delivery_tag) raise e - end = time.time() - - print('Rendering time: {}'.format(humanize.naturaltime(end - start))) - - print('Optimize MBTiles file size') - optimize_mbtiles(mbtiles_file) - upload_mbtiles(bucket, mbtiles_file) - os.remove(mbtiles_file) - - print('Upload mbtiles {}'.format(mbtiles_file)) - - download_link = s3_url(host, port, bucket_name, mbtiles_file) - result_msg = create_result_message(task_id, download_link, msg) - - durable_publish(channel, result_queue_name, - body=json.dumps(result_msg)) - channel.basic_ack(delivery_tag=method.delivery_tag) - - channel.basic_consume(callback, queue=queue_name) try: channel.start_consuming() From 98d19b8227745f0243ebbe4059610a4101948f47 Mon Sep 17 00:00:00 2001 From: lukasmartinelli Date: Sat, 7 May 2016 15:04:36 +0200 Subject: [PATCH 09/11] Fail faster by having subprocess timeout --- src/export/Dockerfile | 1 + src/export/export_remote.py | 30 +++++++++++++++++++----------- src/export/requirements.txt | 1 + 3 files changed, 21 insertions(+), 11 deletions(-) diff --git a/src/export/Dockerfile b/src/export/Dockerfile index 024bc90..869c907 100644 --- a/src/export/Dockerfile +++ b/src/export/Dockerfile @@ -19,6 +19,7 @@ RUN npm -g outdated | grep -v npm RUN apt-get update && apt-get install -y --no-install-recommends \ python \ python-pip \ + python-dev \ && rm -rf /var/lib/apt/lists/ VOLUME /data/tm2source /data/export diff --git a/src/export/export_remote.py b/src/export/export_remote.py index 374bf96..f8881b0 100644 --- a/src/export/export_remote.py +++ b/src/export/export_remote.py @@ -16,18 +16,22 @@ Options: """ import time -import subprocess +import sys import os import os.path import json import humanize - -from boto.s3.connection import S3Connection, OrdinaryCallingFormat -from mbtoolbox.optimize import remove_subpyramids - import pika +from boto.s3.connection import S3Connection, OrdinaryCallingFormat +from mbtoolbox.optimize import find_optimizable_tiles, all_descendant_tiles +from mbtoolbox.mbtiles import MBTiles from docopt import docopt +if os.name == 'posix' and sys.version_info[0] < 3: + import subprocess32 as subprocess +else: + import subprocess + def s3_url(host, port, bucket_name, file_name): protocol = 'https' if port == 443 else 'http' @@ -86,20 +90,24 @@ def render_tile_list_command(source, sink, list_file): def render_pyramid_command(source, sink, bounds, min_zoom, max_zoom): + # Slow tiles should timeout as fast as possible so job can fail return [ 'tilelive-copy', '--scheme', 'pyramid', '--minzoom', str(min_zoom), '--maxzoom', str(max_zoom), '--bounds={}'.format(bounds), - '--timeout=120000', - '--slow=60000', + '--timeout=40000', source, sink ] def optimize_mbtiles(mbtiles_file, mask_level=8): - remove_subpyramids(mbtiles_file, mask_level, 'tms') + mbtiles = MBTiles(mbtiles_file, 'tms') + + for tile in find_optimizable_tiles(mbtiles, mask_level, 'tms'): + tiles = all_descendant_tiles(x=tile.x, y=tile.y, zoom=tile.z, max_zoom=14) + mbtiles.remove_tiles(tiles) def export_remote(tm2source, rabbitmq_url, queue_name, result_queue_name, @@ -146,10 +154,9 @@ def export_remote(tm2source, rabbitmq_url, queue_name, result_queue_name, else: raise ValueError("Message must be either of type pyramid or list") - try: start = time.time() - subprocess.check_call(tilelive_cmd) + subprocess.check_call(tilelive_cmd, timeout=5*60) end = time.time() print('Rendering time: {}'.format(humanize.naturaltime(end - start))) @@ -166,9 +173,10 @@ def export_remote(tm2source, rabbitmq_url, queue_name, result_queue_name, durable_publish(channel, result_queue_name, body=json.dumps(result_msg)) channel.basic_ack(delivery_tag=method.delivery_tag) - except subprocess.CalledProcessError as e: + except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e: durable_publish(channel, failed_queue_name, body=body) channel.basic_ack(delivery_tag=method.delivery_tag) + time.sleep(5) # Give RabbitMQ some time raise e channel.basic_consume(callback, queue=queue_name) diff --git a/src/export/requirements.txt b/src/export/requirements.txt index 052a3d2..07f92e9 100644 --- a/src/export/requirements.txt +++ b/src/export/requirements.txt @@ -2,5 +2,6 @@ docopt==0.6.2 boto==2.38.0 pika==0.10.0 humanize==0.5.1 +subprocess32==3.2.7 -e git+https://github.com/lukasmartinelli/mbtoolbox.git@#egg=mbtoolbox -e git+https://github.com/mapbox/mbutil.git@#egg=mbutil-0.2.0beta From 9ef4e187867d26ef49571115d4290be3a0dc824b Mon Sep 17 00:00:00 2001 From: lukasmartinelli Date: Sun, 8 May 2016 17:42:58 +0200 Subject: [PATCH 10/11] Stop channel on subprocess exception --- src/export/export_remote.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/export/export_remote.py b/src/export/export_remote.py index f8881b0..2ca9673 100644 --- a/src/export/export_remote.py +++ b/src/export/export_remote.py @@ -176,6 +176,7 @@ def export_remote(tm2source, rabbitmq_url, queue_name, result_queue_name, except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e: durable_publish(channel, failed_queue_name, body=body) channel.basic_ack(delivery_tag=method.delivery_tag) + channel.stop_consuming() time.sleep(5) # Give RabbitMQ some time raise e From 7c440c81a527bb14f4b0a20e72f0e92ae5b661bc Mon Sep 17 00:00:00 2001 From: lukasmartinelli Date: Sun, 8 May 2016 17:44:12 +0200 Subject: [PATCH 11/11] Have low prefetch count when merging --- src/merge-jobs/merge-jobs.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/merge-jobs/merge-jobs.py b/src/merge-jobs/merge-jobs.py index 5869891..815cef4 100644 --- a/src/merge-jobs/merge-jobs.py +++ b/src/merge-jobs/merge-jobs.py @@ -69,6 +69,8 @@ def merge_results(rabbitmq_url, merge_target, result_queue_name): connection = pika.BlockingConnection(pika.URLParameters(rabbitmq_url)) channel = connection.channel() + channel.basic_qos(prefetch_count=3) + channel.confirm_delivery() def callback(ch, method, properties, body): msg = json.loads(body.decode('utf-8'))